Module Gnumed.business.gmExportArea

GNUmed export area

Think shopping cart in a web shop.

This is where you want to put documents for further processing by you or someone else, like your secretary.

Expand source code
"""GNUmed export area

Think shopping cart in a web shop.

This is where you want to put documents for further
processing by you or someone else, like your secretary.
"""
#============================================================
__license__ = "GPL v2 or later"
__author__ = "K.Hilbert <Karsten.Hilbert@gmx.net>"


import sys
import logging
import shutil
import os
import platform
import hashlib


if __name__ == '__main__':
        sys.path.insert(0, '../../')
        _ = lambda x:x
from Gnumed.pycommon import gmTools
from Gnumed.pycommon import gmBusinessDBObject
from Gnumed.pycommon import gmPG2
from Gnumed.pycommon import gmMimeLib
from Gnumed.pycommon import gmDateTime
from Gnumed.pycommon import gmCfgINI
from Gnumed.pycommon import gmCrypto

from Gnumed.business import gmDocuments
from Gnumed.business import gmKeywordExpansion
from Gnumed.business import gmStaff


_log = logging.getLogger('gm.exp_area')
_cfg = gmCfgINI.gmCfgData()

PRINT_JOB_DESIGNATION = 'print'
DOCUMENTS_SUBDIR = 'documents'
DIRENTRY_README_NAME = '.README.GNUmed-DIRENTRY'

#============================================================
# export area item handling
#------------------------------------------------------------
_SQL_get_export_items = "SELECT * FROM clin.v_export_items WHERE %s"

class cExportItem(gmBusinessDBObject.cBusinessDBObject):
        """Represents an item in the export area table"""

        _cmd_fetch_payload = _SQL_get_export_items % "pk_export_item = %s"
        _cmds_store_payload = [
                """UPDATE clin.export_item SET
                                fk_identity = %(pk_identity)s,
                                created_by = gm.nullify_empty_string(%(created_by)s),
                                created_when = %(created_when)s,
                                designation = gm.nullify_empty_string(%(designation)s),
                                description = gm.nullify_empty_string(%(description)s),
                                fk_doc_obj = %(pk_doc_obj)s,
                                data = CASE
                                        WHEN %(pk_doc_obj)s IS NULL THEN coalesce(data, 'to be replaced by real data')
                                        ELSE NULL
                                END,
                                filename = CASE
                                        WHEN %(pk_doc_obj)s IS NULL THEN gm.nullify_empty_string(%(filename)s)
                                        ELSE NULL
                                END
                        WHERE
                                pk = %(pk_export_item)s
                                        AND
                                xmin = %(xmin_export_item)s
                """,
                "SELECT clin.export_item_set_list_position(%(pk_export_item)s, %(list_position)s)",
                _SQL_get_export_items % 'pk_export_item = %(pk_export_item)s'
        ]
        _updatable_fields = [
                'pk_identity',
                'created_when',
                'designation',
                'description',
                'pk_doc_obj',
                'filename',
                'list_position'
        ]
        #--------------------------------------------------------
        def __init__(self, aPK_obj=None, row=None, link_obj=None):
                super(cExportItem, self).__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)
                # force auto-healing if need be
                if self._payload['pk_identity_raw_needs_update']:
                        _log.warning (
                                'auto-healing export item [%s] from identity [%s] to [%s] because of document part [%s] seems necessary',
                                self._payload['pk_export_item'],
                                self._payload['pk_identity_raw'],
                                self._payload['pk_identity'],
                                self._payload['pk_doc_obj']
                        )
                        if self._payload['pk_doc_obj'] is None:
                                _log.error('however, .fk_doc_obj is NULL, which should not happen, leaving things alone for manual inspection')
                                return
                        # only flag ourselves as modified, do not actually
                        # modify any values, better safe than sorry
                        self._is_modified = True
                        self.save()
                        self.refetch_payload(ignore_changes = False, link_obj = link_obj)

        #--------------------------------------------------------
#       def format(self):
#               return u'%s' % self
        #--------------------------------------------------------
        def update_data(self, data=None):
                assert (data is not None), '<data> must not be <None>'

                if self.is_DIRENTRY or self.is_document_part:
                        return False

                SQL = """
                        UPDATE clin.export_item SET
                                data = %(data)s::bytea,
                                fk_doc_obj = NULL
                        WHERE pk = %(pk)s"""
                args = {'pk': self.pk_obj, 'data': data}
                gmPG2.run_rw_queries(queries = [{'cmd': SQL, 'args': args}], return_data = False)
                # must update XMIN now ...
                self.refetch_payload()
                return True

        #--------------------------------------------------------
        def update_data_from_file(self, filename=None, convert_document_part=False):

                if self.is_DIRENTRY:
                        return False

                if self.is_document_part:
                        if not convert_document_part:
                                return False

                # sanity check
                if not (os.access(filename, os.R_OK) and os.path.isfile(filename)):
                        _log.error('[%s] is not a readable file' % filename)
                        return False

                cmd = """
                        UPDATE clin.export_item SET
                                data = %(data)s::bytea,
                                fk_doc_obj = NULL,
                                filename = gm.nullify_empty_string(%(fname)s)
                        WHERE pk = %(pk)s"""
                args = {'pk': self.pk_obj, 'fname': filename}
                if not gmPG2.file2bytea(query = cmd, filename = filename, args = args):
                        return False

                # must update XMIN now ...
                self.refetch_payload()
                return True

        #--------------------------------------------------------
        def save_to_file(self, aChunkSize:int=0, filename:str=None, directory:str=None, passphrase:str=None, convert2pdf:bool=False) -> str:
                """Save this entry to disk.

                Args:
                        filename: The filename to save into, unless this is a DIRENTRY.
                        directory: The directory to save the DIRENTRY _content into, if this is a DIRENTRY.
                        passphrase: Encrypt file(s) with this passphrase, if not _None_.
                        convert2pdf: Convert file(s) to PDF on the way out. Before encryption, that is.

                Returns:
                        Directory for DIRENTRIES, or filename, or None on failure.
                """
                if self.is_DIRENTRY and convert2pdf:
                        # cannot convert dir entries to PDF
                        return None

                # data linked from archive ?
                part_fname = self.__save_doc_obj (
                        filename = filename,
                        directory = directory,
                        passphrase = passphrase,
                        convert2pdf = convert2pdf
                )
                if part_fname is False:
                        return None

                if part_fname is not None:
                        return part_fname

                # valid DIRENTRY ?
                if self.is_valid_DIRENTRY:
                        target_dir = self.__save_direntry (
                                directory,
                                passphrase = passphrase
                        )
                        if target_dir is False:
                                return None

                        if target_dir is not None:
                                return target_dir

                # but still a DIRENTRY ?
                if self.is_DIRENTRY:
                        # yes, but apparently not valid (anymore), say,
                        # other machine or dir not found locally
                        return None

                # normal item with data in export area table
                return self.__save_normal_item (
                        filename = filename,
                        directory = directory,
                        passphrase = passphrase,
                        convert2pdf = convert2pdf
                )

        #--------------------------------------------------------
        def display_via_mime(self, chunksize=0, block=None):

                # document part
                if self._payload['pk_doc_obj'] is not None:
                        return self.document_part.display_via_mime(chunksize = chunksize, block = block)

                # DIR entry
                if self._payload['filename'].startswith('DIR::'):
                        # FIXME: error handling with malformed entries
                        tag, node, path = self._payload['filename'].split('::', 2)
                        if node != platform.node():
                                msg = _(
                                        'This item points to a directory on the computer named:\n'
                                        ' %s\n'
                                        'You are, however, currently using another computer:\n'
                                        ' %s\n'
                                        'Directory items can only be viewed/saved/exported\n'
                                        'on the computer they are pointing to.'
                                ) % (node, platform.node())
                                return False, msg
                        success, msg = gmMimeLib.call_viewer_on_file(path, block = block)
                        return success, msg

                # data -> save
                fname = self.save_to_file(aChunkSize = chunksize)
                if fname is None:
                        return False, ''

                success, msg = gmMimeLib.call_viewer_on_file(fname, block = block)
                if not success:
                        return False, msg

                return True, ''

        #--------------------------------------------------------
        def get_useful_filename(self, patient=None, directory=None):
                patient_part = ''
                if patient is not None:
                        patient_part = '-%s' % patient.subdir_name

                # preserve original filename extension if available
                suffix = '.dat'
                if self._payload['filename'] is not None:
                        tmp, suffix = os.path.splitext (
                                gmTools.fname_sanitize(self._payload['filename']).casefold()
                        )
                        if suffix == '':
                                suffix = '.dat'
                fname = gmTools.get_unique_filename (
                        prefix = '%s-gm-export_item%s-' % (self._payload['list_position'], patient_part),
                        suffix = suffix,
                        tmp_dir = directory
                )
                return fname

        #--------------------------------------------------------
        # helpers
        #--------------------------------------------------------
        def __save_normal_item(self, filename:str=None, directory:str=None, passphrase:str=None, convert2pdf:bool=False) -> str:
                """Saves item to disk where item is a "normal" row in the export area.

                Item must not be a DIRENTRY nor a link pointing to a
                document part in the archive.

                Always eventually use the return value as the
                canonical filename for further processing and do not
                rely on <filename> being returned as-is. This may be
                due to <filename> not carrying a path (in which case
                <directory> is used), or the exported file being
                converted to PDF and/or encrypted which may change
                the file extension.

                Args:
                        filename: a target filename, or rather, a template for same
                        directory: target directory, when filename is not given, or does not include a path

                Returns:
                        The ultimate name of the file, saved, converted, and encrypted, as instructed.
                """
                #_log.debug('<filename> %s', filename)
                #_log.debug('<directory> %s', directory)
                tmp_fname = gmTools.get_unique_filename()
                _log.debug('temporary dump file: %s', tmp_fname)
                # make sure output file eventually ends up in dir-of-<filename> or in <directory>
                target_path = None
                if filename:
                        target_path = gmTools.fname_dir(filename)
                if not target_path:
                        target_path = directory
                if not target_path:
                        target_path = gmTools.fname_dir(tmp_fname)
                if target_path.strip() == '':
                        target_path = None
                _log.debug('target path: %s', target_path)
                SQL = 'SELECT substring(data FROM %(start)s FOR %(size)s) FROM clin.export_item WHERE pk = %(pk)s'
                success = gmPG2.bytea2file (
                        data_query = {'cmd': SQL, 'args': {'pk': self.pk_obj}},
                        filename = tmp_fname,
                        data_size = self._payload['size']
                )
                if not success:
                        return None

                tmp_fname = gmMimeLib.adjust_extension_by_mimetype(tmp_fname)
                #_log.debug('extension-adjusted temporary dump file: %s', tmp_fname)
                if convert2pdf:
                        pdf_fname = gmMimeLib.convert_file(filename = tmp_fname, target_mime = 'application/pdf', target_extension = '.pdf')
                        if not pdf_fname:
                                return None

                        tmp_fname = pdf_fname
                        _log.debug('converted to pdf: %s', tmp_fname)

                if not passphrase:
                        if filename:
                                target_fname = os.path.join(target_path, gmTools.fname_from_path(filename))
                                #_log.debug('target filename from passed in name: %s', target_fname)
                        else:
                                target_fname = self.get_useful_filename(directory = target_path)
                                #_log.debug('generated target filename: %s', target_fname)
                        if not gmTools.rename_file(tmp_fname, target_fname, overwrite = True, allow_symlink = True):
                                return None

                        ext = None
                        if filename:
                                ext = gmTools.fname_extension(filename)
                        if not ext:                     # either empty or filename not passed in
                                target_fname = gmMimeLib.adjust_extension_by_mimetype(target_fname)
                                _log.debug('extension-adjusted target file: %s', target_fname)
                        return target_fname

                enc_fname = gmCrypto.encrypt_file (
                        filename = tmp_fname,
                        passphrase = passphrase,
                        verbose = _cfg.get(option = 'debug'),
                        remove_unencrypted = True,
                        convert2pdf = False     # already done, if desired
                )
                #_log.debug('encrypted file: %s', enc_fname)
                removed = gmTools.remove_file(tmp_fname)
                if enc_fname is None:
                        _log.error('cannot encrypt or, possibly, convert')
                        return None

                if not removed:
                        _log.error('cannot remove unencrypted file')
                        gmTools.remove_file(enc_fname)
                        return None

                target_fname = os.path.join(target_path, gmTools.fname_from_path(enc_fname))
                if not gmTools.rename_file(enc_fname, target_fname, overwrite = True, allow_symlink = True):
                        gmTools.remove_file(enc_fname)
                        return None

                _log.debug('generated target filename: %s', target_fname)
                return target_fname

        #--------------------------------------------------------
        def __save_doc_obj(self, filename=None, directory=None, passphrase=None, convert2pdf:bool=False):
                """Save doc object part into target.

                None: not a doc obj
                True: success
                False: failure (to save/convert/encrypt/remove unencrypted)
                """
                if self._payload['pk_doc_obj'] is None:
                        return None

                part = self.document_part
                if not filename:
                        filename = part.get_useful_filename (
                                make_unique = False,
                                directory = directory,
                                include_gnumed_tag = False,
                                date_before_type = True,
                                name_first = False
                        )
                path, name = os.path.split(filename)
                filename = os.path.join(path, '%s-%s' % (self._payload['list_position'], name))
                if convert2pdf:
                        target_mime = 'application/pdf'
                        target_ext = '.pdf'
                else:
                        target_mime = None
                        target_ext = None
                part_fname = part.save_to_file (
                        filename = filename,
                        target_mime = target_mime,
                        target_extension = target_ext,
                        ignore_conversion_problems = False
                )
                if part_fname is None:
                        _log.error('cannot save document part to file')
                        return False

                if passphrase is None:
                        return part_fname

                enc_filename = gmCrypto.encrypt_file (
                        filename = part_fname,
                        passphrase = passphrase,
                        verbose = _cfg.get(option = 'debug'),
                        remove_unencrypted = True
                )
                removed = gmTools.remove_file(part_fname)
                if enc_filename is None:
                        _log.error('cannot encrypt')
                        return False

                if not removed:
                        _log.error('cannot remove unencrypted file')
                        gmTools.remove_file(enc_filename)
                        return None

                if not filename:
                        return enc_filename

                # make sure encrypted file ends up in dir-of-filename
                target_fname = os.path.join (
                        gmTools.fname_dir(filename),
                        gmTools.fname_from_path(enc_filename)
                )
                if not gmTools.rename_file(enc_filename, target_fname, overwrite = True, allow_symlink = True):
                        gmTools.remove_file(enc_filename)
                        return None

                return target_fname

        #--------------------------------------------------------
        def __save_direntry(self, directory=None, passphrase=None):
                """Move DIRENTRY source into target.

                None: not a DIRENTRY
                True: success
                False: failure
                """
                # do not process malformed entries
                try:
                        tag, node, local_fs_path = self._payload['filename'].split('::', 2)
                except ValueError:
                        _log.exception('malformed DIRENTRY: [%s]', self._payload['filename'])
                        return False
                # source and target paths must not overlap
                if directory is None:
                        directory = gmTools.mk_sandbox_dir(prefix = 'exp-')
                if directory.startswith(local_fs_path):
                        _log.error('cannot dump DIRENTRY item [%s]: must not be subdirectory of target dir [%s]', self._payload['filename'], directory)
                        return False
                if local_fs_path.startswith(directory):
                        _log.error('cannot dump DIRENTRY item [%s]: target dir [%s] must not be subdirectory of DIRENTRY', self._payload['filename'], directory)
                        return False

                _log.debug('dumping DIRENTRY item [%s] into [%s]', self._payload['filename'], directory)
                sandbox_dir = gmTools.mk_sandbox_dir()
                _log.debug('sandbox: %s', sandbox_dir)
                tmp = gmTools.copy_tree_content(local_fs_path, sandbox_dir)
                if tmp is None:
                        _log.error('cannot dump DIRENTRY item [%s] into [%s]: copy error', self._payload['filename'], sandbox_dir)
                        return False

                gmTools.remove_file(os.path.join(tmp, DIRENTRY_README_NAME))
                if passphrase is not None:
                        _log.debug('encrypting sandbox: %s', sandbox_dir)
                        encrypted = gmCrypto.encrypt_directory_content (
                                directory = sandbox_dir,
                                passphrase = passphrase,
                                verbose = _cfg.get(option = 'debug'),
                                remove_unencrypted = True
                        )
                        if not encrypted:
                                _log.error('cannot dump DIRENTRY item [%s]: encryption problem in [%s]', self._payload['filename'], sandbox_dir)
                                return False

                tmp = gmTools.copy_tree_content(sandbox_dir, directory)
                if tmp is None:
                        _log.debug('cannot dump DIRENTRY item [%s] into [%s]: copy error', self._payload['filename'], directory)
                        return False

                return directory

        #--------------------------------------------------------
        # properties
        #--------------------------------------------------------
        def _get_is_doc_part(self):
                return self._payload['pk_doc_obj'] is not None

        is_document_part = property(_get_is_doc_part)

        #--------------------------------------------------------
        def _get_doc_part(self):
                if self._payload['pk_doc_obj'] is None:
                        return None
                return gmDocuments.cDocumentPart(aPK_obj = self._payload['pk_doc_obj'])

        document_part = property(_get_doc_part)

        #--------------------------------------------------------
        def _get_is_print_job(self):
                return self._payload['designation'] == PRINT_JOB_DESIGNATION

        def _set_is_print_job(self, is_print_job):
                desig = gmTools.bool2subst(is_print_job, PRINT_JOB_DESIGNATION, None, None)
                if self._payload['designation'] == desig:
                        return
                self['designation'] = desig
                self.save()

        is_print_job = property(_get_is_print_job, _set_is_print_job)

        #--------------------------------------------------------
        def _is_DIRENTRY(self):
                """Check whether this item looks like a DIRENTRY."""
                if self._payload['filename'] is None:
                        return False
                if not self._payload['filename'].startswith('DIR::'):
                        return False
                if len(self._payload['filename'].split('::', 2)) != 3:
                        _log.exception('DIRENTRY [%s]: malformed', self._payload['filename'])
                        return False
                return True

        is_DIRENTRY = property(_is_DIRENTRY)

        #--------------------------------------------------------
        def _is_local_DIRENTRY(self):
                """Check whether this item is a _local_ DIRENTRY."""
                if not self.is_DIRENTRY:
                        return False

                tag, node, local_fs_path = self._payload['filename'].split('::', 2)
                if node == platform.node():
                        return True

                _log.warning('DIRENTRY [%s]: not on this machine (%s)', self._payload['filename'], platform.node())
                return False

        is_local_DIRENTRY = property(_is_local_DIRENTRY)

        #--------------------------------------------------------
        def _is_valid_DIRENTRY(self):
                """Check whether this item is a _valid_ DIRENTRY."""
                if not self.is_local_DIRENTRY:
                        return False

                tag, node, local_fs_path = self._payload['filename'].split('::', 2)
                # valid path ?
                if not os.path.isdir(local_fs_path):
                        _log.warning('DIRENTRY [%s]: directory not found (old DIRENTRY ?)', self._payload['filename'])
                        return False
                return True

        is_valid_DIRENTRY = property(_is_valid_DIRENTRY)

        #--------------------------------------------------------
        def _get_DIRENTRY_node(self):
                if not self.is_DIRENTRY:
                        return None

                try:
                        tag, node, local_fs_path = self._payload['filename'].split('::', 2)
                except ValueError:
                        # should not happen because structure already checked in .is_DIRENTRY,
                        # better safe than sorry
                        _log.exception('DIRENTRY [%s]: malformed', self._payload['filename'])
                        return None

                return node

        DIRENTRY_node = property(_get_DIRENTRY_node)

        #--------------------------------------------------------
        def _get_DIRENTRY_path(self):
                if not self.is_DIRENTRY:
                        return None

                try:
                        tag, node, local_fs_path = self._payload['filename'].split('::', 2)
                except ValueError:
                        # should not happen because structure already checked in .is_DIRENTRY,
                        # better safe than sorry
                        _log.exception('DIRENTRY [%s]: malformed', self._payload['filename'])
                        return None

                return local_fs_path

        DIRENTRY_path = property(_get_DIRENTRY_path)

        #--------------------------------------------------------
        def _is_DICOM_directory(self):
                """Check whether this item points to a DICOMDIR."""
                if not self.is_valid_DIRENTRY:
                        return False

                tag, node, local_fs_path = self._payload['filename'].split('::', 2)
                found_DICOMDIR = False
                for fs_entry in os.listdir(local_fs_path):
                        # found a subdir
                        if os.path.isdir(os.path.join(local_fs_path, fs_entry)):
                                # allow for any number of subdirs
                                continue
                        # found a file
                        if fs_entry != 'DICOMDIR':
                                # not named "DICOMDIR" -> not a DICOMDIR DIRENTRY
                                return False

                        # must be named DICOMDIR -> that's the only file allowed (and required) in ./
                        found_DICOMDIR = True
                return found_DICOMDIR

        is_DICOM_directory = property(_is_DICOM_directory)

        #--------------------------------------------------------
        def _has_files_in_root(self):
                """True if there are files in the root directory."""
                tag, node, local_fs_path = self._payload['filename'].split('::', 2)
                for fs_entry in os.listdir(local_fs_path):
                        if os.path.isfile(fs_entry):
                                _log.debug('has files in top level: %s', local_fs_path)
                                return True
                return False

        has_files_in_root = property(_has_files_in_root)

#------------------------------------------------------------
def get_export_items(order_by=None, pk_identity=None, designation=None, return_pks=False):

        args = {
                'pat': pk_identity,
                'desig': gmTools.coalesce(designation, PRINT_JOB_DESIGNATION)
        }
        where_parts = []
        if pk_identity is not None:
                where_parts.append('pk_identity = %(pat)s')
                # note that invalidly linked items will be
                # auto-healed when instantiated
        if designation is None:
                where_parts.append("designation IS DISTINCT FROM %(desig)s")
        else:
                where_parts.append('designation = %(desig)s')
        if order_by is None:
                order_by = 'pk_identity, list_position'
        order_by = ' ORDER BY %s' % order_by
        cmd = (_SQL_get_export_items % ' AND '.join(where_parts)) + order_by
        rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])
        if return_pks:
                return [ r['pk_export_item'] for r in rows ]

        return [ cExportItem(row = {'data': r, 'pk_field': 'pk_export_item'}) for r in rows ]

#------------------------------------------------------------
def get_print_jobs(order_by=None, pk_identity=None):
        return get_export_items(order_by = order_by, pk_identity = pk_identity, designation = PRINT_JOB_DESIGNATION)

#------------------------------------------------------------
def create_export_item(description=None, pk_identity=None, pk_doc_obj=None, filename=None):

        args = {
                'desc': description,
                'pk_obj': pk_doc_obj,
                'pk_pat': pk_identity,
                'fname': filename
        }
        cmd = """
                INSERT INTO clin.export_item (
                        description,
                        fk_doc_obj,
                        fk_identity,
                        data,
                        filename
                ) VALUES (
                        gm.nullify_empty_string(%(desc)s),
                        %(pk_obj)s,
                        %(pk_pat)s,
                        (CASE
                                WHEN %(pk_obj)s IS NULL THEN %(fname)s::bytea
                                ELSE NULL::bytea
                        END),
                        (CASE
                                WHEN %(pk_obj)s IS NULL THEN gm.nullify_empty_string(%(fname)s)
                                ELSE NULL
                        END)
                )
                RETURNING pk
        """
        rows = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}], return_data = True)
        return cExportItem(aPK_obj = rows[0]['pk'])

#------------------------------------------------------------
def delete_export_item(pk_export_item=None):
        args = {'pk': pk_export_item}
        cmd = "DELETE FROM clin.export_item WHERE pk = %(pk)s"
        gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
        return True

#============================================================
#============================================================
_FRONTPAGE_HTML_CONTENT = """<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
       "https://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="content-type" content="text/html; charset=UTF-8">
<link rel="icon" type="image/x-icon" href="gnumed.ico">
<title>%(html_title_header)s %(html_title_patient)s</title>
</head>
<body>

<h1>%(title)s</h1>

<p>
(%(date)s)<br>
</p>

<h2><a href="patient.vcf">Patient</a></h2>

<p>
        %(pat_name)s<br>
        %(pat_dob)s
</p>

<p><img src="%(mugshot_url)s" alt="%(mugshot_alt)s" title="%(mugshot_title)s" width="200" border="2"></p>

<h2>%(docs_title)s</h2>

<ul>
        <li><a href="./">%(browse_root)s</a></li>
        <li><a href="%(doc_subdir)s/">%(browse_docs)s</a></li>
        %(browse_dicomdir)s
        %(run_dicom_viewer)s
</ul>

<ul>
%(docs_list)s
</ul>

<h2><a href="praxis.vcf">Praxis</a></h2>

<p>
%(branch)s @ %(praxis)s
%(adr)s
</p>

<p>(<a href="https://www.gnumed.de">GNUmed</a> version %(gm_ver)s)</p>

</body>
</html>
"""

_INDEX_HTML_CONTENT = """<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
       "https://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="content-type" content="text/html; charset=UTF-8">
<link rel="icon" type="image/x-icon" href="gnumed.ico">
<title>%(html_title_header)s</title>
</head>
<body>

<h1>%(title)s</h1>

<p>
(%(date)s)<br>
</p>

This is an encrypted patient data excerpt created by the GNUmed Electronic Medical Record.

<p>
For decryption you will need to

<ul>
        <li>install decryption software and</li>
        <li>obtain relevant passwords from the creator or holder of this media</li>
</ul>

<h2>Decryption software</h2>

For files ending in

<ul>
        <li>.asc: install <a href="https://gnupg.org">GNU Privacy Guard</a></li>
        <li>.7z: install <a href="https://www.7-zip.org">7-zip</a> or <a href="https://www.winzip.com">WinZip</a></li>
</ul>


<h2>%(docs_title)s</h2>

<ul>
        <li><a href="./frontpage.html">front page (after decryption)</a></li>
        <li><a href="./%(frontpage_fname)s">front page (if decryption from browser is supported)</a></li>

        <li><a href="./">%(browse_root)s</a></li>
        <li><a href="%(doc_subdir)s/">%(browse_docs)s</a></li>
</ul>


<h2><a href="praxis.vcf">Praxis</a></h2>

<p>
%(branch)s @ %(praxis)s
%(adr)s
</p>

<p>(<a href="https://www.gnumed.de">GNUmed</a> version %(gm_ver)s)</p>

</body>
</html>
"""

#------------------------------------------------------------
class cExportArea(object):

        def __init__(self, pk_identity):
                self.__pk_identity = pk_identity

        #--------------------------------------------------------
        def add_form(self, form=None, designation=None):

                if len(form.final_output_filenames) == 0:
                        return True

                items = []
                for fname in form.final_output_filenames:
                        item = self.add_file(filename = fname)
                        if item is None:
                                for prev_item in items:
                                        delete_export_item(pk_export_item = prev_item['pk_export_item'])
                                return False
                        items.append(item)
                        item['description'] = _('form: %s %s (%s)') % (form.template['name_long'], form.template['external_version'], fname)
                        item['designation'] = designation
                        item.save()

                return True

        #--------------------------------------------------------
        def add_forms(self, forms=None, designation=None):
                all_ok = True
                for form in forms:
                        all_ok = all_ok and self.add_form(form = form, designation = designation)

                return all_ok

        #--------------------------------------------------------
        def add_path(self, path, comment=None):
                """Add a DIR entry to the export area.

                This sort of entry points to a certain directory on a
                certain machine. The content of the the directory
                will be included in exports, the directory *itself*
                will not. For *that*, use a disposable top-level
                directory into which you put the directory to include
                as a subdirectory.
                """
                assert (os.path.isdir(path)), '<path> must exist: %s' % path

                path_item_data = 'DIR::%s::%s/' % (platform.node(), path.rstrip('/'))
                _log.debug('attempting to add path item [%s]', path_item_data)
                item = self.path_item_exists(path_item_data)
                if item is not None:
                        _log.debug('[%s] already in export area', path)
                        return item

                if comment is None:
                        comment = _('path [%s/] on computer "%s"') % (
                                path.rstrip('/'),
                                platform.node()
                        )
                else:
                        comment += _(' (on "%s")') % platform.node()

                item = create_export_item (
                        description = comment,
                        pk_identity = self.__pk_identity,
                        filename = path_item_data
                )
                try:
                        README = open(os.path.join(path, DIRENTRY_README_NAME), mode = 'wt', encoding = 'utf8')
                        README.write('GNUmed DIRENTRY information\n')
                        README.write('created: %s\n' % gmDateTime.pydt_now_here())
                        README.write('machine: %s\n' % platform.node())
                        README.write('path: %s\n' % path)
                        README.close()
                except OSError:
                        _log.exception('READONLY DIRENTRY [%s]', path)

                return item

        #--------------------------------------------------------
        def path_item_exists(self, path_item_data):

                assert (path_item_data.startswith('DIR::')), 'invalid <path_item_data> [%s]' % path_item_data

                where_parts = [
                        'pk_identity = %(pat)s',
                        'filename = %(fname)s',
                        'pk_doc_obj IS NULL'
                ]
                args = {
                        'pat': self.__pk_identity,
                        'fname': path_item_data
                }
                SQL = _SQL_get_export_items % ' AND '.join(where_parts)
                rows = gmPG2.run_ro_queries(queries = [{'cmd': SQL, 'args': args}])
                if len(rows) == 0:
                        return None

                r = rows[0]
                return cExportItem(row = {'data': r, 'pk_field': 'pk_export_item'})

        #--------------------------------------------------------
        def add_file(self, filename=None, hint=None):
                try:
                        open(filename).close()
                except Exception:
                        _log.exception('cannot open file <%s>', filename)
                        return None

                file_md5 = gmTools.file2md5(filename = filename, return_hex = True)
                existing_item = self.md5_exists(md5 = file_md5, include_document_parts = False)
                if existing_item is not None:
                        _log.debug('md5 match (%s): %s already in export area', file_md5, filename)
                        return existing_item

                path, basename = os.path.split(filename)
                item = create_export_item (
                        description = '%s: %s (%s/)' % (
                                gmTools.coalesce(hint, _('file'), '%s'),
                                basename,
                                path
                        ),
                        pk_identity = self.__pk_identity,
                        filename = filename
                )

                if item.update_data_from_file(filename = filename):
                        return item

                # failed to insert data, hence remove export item entry
                delete_export_item(pk_export_item = item['pk_export_item'])
                return None

        #--------------------------------------------------------
        def add_files(self, filenames=None, hint=None):
                all_ok = True
                for fname in filenames:
                        all_ok = all_ok and (self.add_file(filename = fname, hint = hint) is not None)

                return all_ok

        #--------------------------------------------------------
        def add_documents(self, documents:list=None) -> None:
                for doc in documents:
                        doc_tag = _('%s (%s)%s') % (
                                doc['l10n_type'],
                                gmDateTime.pydt_strftime(doc['clin_when'], '%Y %b %d'),
                                gmTools.coalesce(doc['comment'], '', ' "%s"')
                        )
                        for obj in doc.parts:
                                if self.document_part_item_exists(pk_part = obj['pk_obj']):
                                        continue
                                f_ext = ''
                                if obj['filename'] is not None:
                                        f_ext = os.path.splitext(obj['filename'])[1].strip('.').strip()
                                if f_ext != '':
                                        f_ext = ' .' + f_ext.upper()
                                obj_tag = _('part %s (%s%s)%s') % (
                                        obj['seq_idx'],
                                        gmTools.size2str(obj['size']),
                                        f_ext,
                                        gmTools.coalesce(obj['obj_comment'], '', ' "%s"')
                                )
                                create_export_item (
                                        description = '%s - %s' % (doc_tag, obj_tag),
                                        pk_doc_obj = obj['pk_obj']
                                )

        #--------------------------------------------------------
        def document_part_item_exists(self, pk_part=None):
                cmd = "SELECT EXISTS (SELECT 1 FROM clin.export_item WHERE fk_doc_obj = %(pk_obj)s)"
                args = {'pk_obj': pk_part}
                rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])
                return rows[0][0]

        #--------------------------------------------------------
        def md5_exists(self, md5=None, include_document_parts=False):
                where_parts = [
                        'pk_identity = %(pat)s',
                        'md5_sum = %(md5)s'
                ]
                args = {
                        'pat': self.__pk_identity,
                        'md5': md5
                }

                if not include_document_parts:
                        where_parts.append('pk_doc_obj IS NULL')

                cmd = _SQL_get_export_items % ' AND '.join(where_parts)
                rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])

                if len(rows) == 0:
                        return None

                r = rows[0]
                return cExportItem(row = {'data': r, 'pk_field': 'pk_export_item'})

        #--------------------------------------------------------
        def remove_item(self, item):
                if item.is_valid_DIRENTRY:
                        gmTools.remove_file(os.path.join(item.DIRENTRY_path, DIRENTRY_README_NAME))
                return delete_export_item(pk_export_item = item['pk_export_item'])

        #--------------------------------------------------------
        def dump_items_to_disk(self, base_dir:str=None, items:list=None, passphrase:str=None, convert2pdf:bool=False, master_passphrase:str=None) -> str:
                """Dump export area items to disk.

                Args:
                        base_dir: directory into which to dump the items, defaults to a new sandbox directory
                        items: which items to dump, defaults to all items
                        passphrase: used to symmetrically encrypt dumped files, if set
                        convert2pdf: attempt to convert items into PDF before dumping them
                        master_passphrase: encrypt passphrase with this password for safekeeping in GNUmed if no pubkeys available

                Returns:
                        The base_dir into which items were dumped, or None.
                """
                if items is None:
                        items = self.items
                if len(items) == 0:
                        return None

                if base_dir is None:
                        base_dir = gmTools.mk_sandbox_dir()
                else:
                        gmTools.mkdir(base_dir)
                _log.debug('dumping export items to: %s', base_dir)
                for item in items:
                        saved_fname = item.save_to_file(directory = base_dir, passphrase = passphrase, convert2pdf = convert2pdf)
                        if saved_fname is None:
                                return None

                        if passphrase:
                                store_passphrase_of_file(filename = saved_fname, passphrase = passphrase, master_passphrase = master_passphrase)

                return base_dir

        #--------------------------------------------------------
        def dump_items_to_disk_as_zip(self, base_dir:str=None, items:list=None, passphrase:str=None, master_passphrase:str=None) -> str:
                """Dump items to disk into a zip archive.

                        Calls dump_items_to_disk().

                Args:
                        base_dir: directory into which to dump the items, defaults to a new sandbox directory
                        items: which items to dump, defaults to all items
                        passphrase: used to symmetrically encrypt dumped files, if set
                        master_passphrase: encrypt passphrase with this password for safekeeping in GNUmed if no pubkeys available

                Returns:
                        Zip archive name, or None.
                """
                _log.debug('target dir: %s', base_dir)
                dump_dir = self.dump_items_to_disk(base_dir = base_dir, items = items)
                if dump_dir is None:
                        _log.error('cannot dump export area items')
                        return None

                if passphrase:
                        zip_file = gmCrypto.create_encrypted_zip_archive_from_dir (
                                source_dir = dump_dir,
                                comment = _('GNUmed Patient Media'),
                                overwrite = True,
                                passphrase = passphrase,
                                verbose = _cfg.get(option = 'debug')
                        )
                        if zip_file:
                                store_passphrase_of_file(filename = zip_file, passphrase = passphrase, master_passphrase = master_passphrase)
                        else:
                                _log.error('cannot zip+encrypt export area items dump')
                        return zip_file

                zip_file = gmCrypto.create_zip_archive_from_dir (
                        dump_dir,
                        comment = _('GNUmed Patient Media'),
                        overwrite = True,
                        verbose = _cfg.get(option = 'debug')
                )
                if not zip_file:
                        _log.error('cannot zip export area items dump')
                return zip_file

        #--------------------------------------------------------
        def export(self, base_dir:str=None, items:list=None, passphrase:str=None, master_passphrase:str=None) -> bool|str:
                """Export items as structured patient media.

                Args:
                        base_dir: directory into which to store the export, defaults to a new sandbox directory based on patient name
                        items: which items to dump, defaults to all items
                        passphrase: used to symmetrically encrypt dumped files, if set
                        master_passphrase: password to encrypt passphrase with for safekeeping, if no public keys available for that

                Returns:
                        Base_dir name, else None or False on failure.
                """
                if items is None:
                        items = self.items
                if len(items) == 0:
                        return None

                from Gnumed.business.gmPerson import cPatient
                pat = cPatient(aPK_obj = self.__pk_identity)
                target_base_dir = base_dir
                if target_base_dir is None:
                        target_sandbox_dir = gmTools.mk_sandbox_dir()
                        target_base_dir = os.path.join(target_sandbox_dir, pat.subdir_name)
                gmTools.mkdir(target_base_dir)
                _log.debug('patient media base dir: %s', target_base_dir)
                if not gmTools.dir_is_empty(target_base_dir):
                        _log.error('patient media base dir is not empty')
                        return False

                from Gnumed.business.gmPraxis import gmCurrentPraxisBranch
                prax = gmCurrentPraxisBranch()

                html_data = {}

                # 1) assemble everything into a sandbox
                # - setup sandbox
                sandbox_dir = gmTools.mk_sandbox_dir()
                _log.debug('sandbox dir: %s', sandbox_dir)
                doc_dir = os.path.join(sandbox_dir, DOCUMENTS_SUBDIR)
                gmTools.mkdir(doc_dir)
                # - export mugshot
                mugshot = pat.document_folder.latest_mugshot
                if mugshot is not None:
                        mugshot_fname = mugshot.save_to_file(directory = doc_dir)
                        fname = os.path.split(mugshot_fname)[1]
                        html_data['mugshot_url'] = os.path.join(DOCUMENTS_SUBDIR, fname)
                        html_data['mugshot_alt'] =_('patient photograph from %s') % gmDateTime.pydt_strftime(mugshot['date_generated'], '%B %Y')
                        html_data['mugshot_title'] = gmDateTime.pydt_strftime(mugshot['date_generated'], '%B %Y')
                # - export patient demographics as GDT/XML/VCF/MCF
                pat.export_as_gdt(filename = os.path.join(sandbox_dir, 'patient.gdt'))
                pat.export_as_xml_linuxmednews(filename = os.path.join(sandbox_dir, 'patient.xml'))
                pat.export_as_vcard(filename = os.path.join(sandbox_dir, 'patient.vcf'))
                pat.export_as_mecard(filename = os.path.join(sandbox_dir, 'patient.mcf'))
                # - create CD.INF
                self._create_cd_inf(pat, sandbox_dir)
                # - export items
                docs_list = []
                for item in items:
                        # if it is a dicomdir - put it into the root of the target media
                        if item.is_DICOM_directory:
                                _log.debug('exporting DICOMDIR DIRENTRY')
                                # save into base dir
                                item_fname = item.save_to_file(directory = sandbox_dir)
                                # do not include into ./documents/ listing
                                continue
                        if item.is_valid_DIRENTRY:
                                _log.debug('exporting DIRENTRY')
                                # if there are files in the root dir: put it into a
                                # subdir of ./documents/ where subdir is the leaf
                                # of the item .filename
                                if item.has_files_in_root:
                                        tag, node, local_fs_path = item['filename'].split('::', 2)
                                        subdir = local_fs_path.rstrip('/').split('/')[-1]
                                        subdir = os.path.join(doc_dir, subdir)
                                        gmTools.mkdir(subdir)
                                        item_fname = item.save_to_file(directory = subdir)
                                # if it is subdirs only - put it into documents/ directly
                                else:
                                        item_fname = item.save_to_file(directory = doc_dir)
                                # include into ./documents/ listing
                                fname = os.path.split(item_fname)[1]
                                docs_list.append([fname, gmTools.html_escape_string(item['description'])])
                                continue
                        if item.is_DIRENTRY:
                                # DIRENTRY but not valid: skip
                                continue
                        # normal entry, doc obj links or data item
                        item_fname = item.save_to_file(directory = doc_dir)
                        fname = os.path.split(item_fname)[1]
                        # collect items so we can later correlate items and descriptions
                        # actually we should link to encrypted items, and to unencrypted ones
                        docs_list.append([fname, gmTools.html_escape_string(item['description'])])
                # - link to DICOMDIR if exists
                if 'DICOMDIR' in os.listdir(sandbox_dir):       # in root path
                        has_dicomdir = True
                        html_data['browse_dicomdir'] = '<li><a href="./DICOMDIR">%s</a></li>' % _('show DICOMDIR file')
                else:
                        has_dicomdir = False
                # - include DWV link if there is a DICOMDIR
                if has_dicomdir:
                        # do not clone into media base to avoid encryption,
                        # DWV will be cloned to there later on
                        dwv_sandbox_dir = self._clone_dwv()
                        if dwv_sandbox_dir is not None:
                                html_data['run_dicom_viewer'] = '<li><a href="./dwv/viewers/mobile-local/index.html">%s</a></li>' % _('run Radiology Images (DICOM) Viewer')
                # - create frontpage.html
                frontpage_fname = self._create_frontpage_html(pat, prax, sandbox_dir, html_data, docs_list)
                # - start.html (just a convenience copy of frontpage.html)
                #   (later overwritten, if encryption is requested)
                start_fname = os.path.join(sandbox_dir, 'start.html')
                try:
                        shutil.copy2(frontpage_fname, start_fname)
                except Exception:
                        _log.exception('cannot copy %s to %s', frontpage_fname, start_fname)
                # - index.html (just a convenience copy of frontpage.html)
                #   (later overwritten if encryption is requested)
                index_fname = os.path.join(sandbox_dir, 'index.html')
                try:
                        shutil.copy2(frontpage_fname, index_fname)
                except Exception:
                        _log.exception('cannot copy %s to %s', frontpage_fname, index_fname)

                # 2) encrypt content of sandbox
                if passphrase is not None:
                        encrypted = gmCrypto.encrypt_directory_content (
                                directory = sandbox_dir,
                                receiver_key_ids = None,
                                passphrase = passphrase,
                                comment = None,
                                verbose = _cfg.get(option = 'debug'),
                                remove_unencrypted = True,
                                master_passphrase = master_passphrase,
                                store_passphrase_cb = store_passphrase_of_file_callback
                        )
                        if not encrypted:
                                _log.error('cannot encrypt data in sandbox dir')
                                return False

                # 3) add never-to-be-encrypted data
                # - AUTORUN.INF
                # - README
                if passphrase:
                        self._create_autorun_inf(None, sandbox_dir)
                        self._create_readme(None, sandbox_dir)
                else:
                        self._create_autorun_inf(pat, sandbox_dir)
                        self._create_readme(pat, sandbox_dir)
                # - praxis VCF/MCF
                shutil.move(prax.vcf, os.path.join(sandbox_dir, 'praxis.vcf'))
                prax.export_as_mecard(filename = os.path.join(sandbox_dir, u'praxis.mcf'))
                # - include DWV code
                if has_dicomdir:
                        self._clone_dwv(target_dir = sandbox_dir)
                # - index.html as boilerplate for decryption
                if passphrase:
                        index_fname = self._create_index_html(prax, sandbox_dir, html_data)

                # 4) move sandbox to target dir
                target_dir = gmTools.copy_tree_content(sandbox_dir, target_base_dir)
                if target_dir is None:
                        _log.error('cannot fill target base dir')
                        return False

                return target_dir

        #--------------------------------------------------------
        def export_as_zip(self, base_dir:str=None, items:list=None, passphrase:str=None, master_passphrase:str=None) -> str:
                _log.debug('target dir: %s', base_dir)
                export_dir = self.export(base_dir = base_dir, items = items)
                if export_dir is None:
                        _log.debug('cannot export items')
                        return None

                if not passphrase:
                        zip_file = gmCrypto.create_zip_archive_from_dir (
                                export_dir,
                                comment = _('GNUmed Patient Media'),
                                overwrite = True,
                                verbose = _cfg.get(option = 'debug')
                        )
                        if not zip_file:
                                _log.debug('cannot create zip archive')
                        return zip_file

                zip_file = gmCrypto.create_encrypted_zip_archive_from_dir (
                        export_dir,
                        comment = _('GNUmed Patient Media'),
                        overwrite = True,
                        passphrase = passphrase,
                        verbose = _cfg.get(option = 'debug')
                )
                if zip_file:
                        store_passphrase_of_file(filename = zip_file, passphrase = passphrase, master_passphrase = master_passphrase)
                else:
                        _log.debug('cannot create zip archive')
                return zip_file

        #--------------------------------------------------------
        def _create_index_html(self, praxis, directory, data):
                # header part
                _HTML_data = {
                        'html_title_header': _('Patient data'),
                        'title': _('Patient data excerpt'),
                        'docs_title': _('Documents'),
                        'browse_root': _('browse storage medium'),
                        'browse_docs': _('browse documents area'),
                        'doc_subdir': DOCUMENTS_SUBDIR,
                        'date' : gmTools.html_escape_string(gmDateTime.pydt_strftime(gmDateTime.pydt_now_here(), format = '%Y %B %d'))
                }
                frontpage_fname_enc = 'frontpage.html.asc'
                if os.path.isfile(os.path.join(directory, frontpage_fname_enc)):
                        _HTML_data['frontpage_fname'] = frontpage_fname_enc
                frontpage_fname_enc = 'frontpage.html.7z'
                if os.path.isfile(os.path.join(directory, frontpage_fname_enc)):
                        _HTML_data['frontpage_fname'] = frontpage_fname_enc
                # footer part
                lines = []
                adr = praxis.branch.org_unit.address
                if adr is not None:
                        lines.extend(adr.format())
                for comm in praxis.branch.org_unit.comm_channels:
                        if comm['is_confidential'] is True:
                                continue
                        lines.append('%s: %s' % (
                                comm['l10n_comm_type'],
                                comm['url']
                        ))
                adr = ''
                if len(lines) > 0:
                        adr = gmTools.html_escape_string('\n'.join(lines), replace_eol = True, keep_visual_eol = True)
                _HTML_data['branch'] = gmTools.html_escape_string(praxis['branch'])
                _HTML_data['praxis'] = gmTools.html_escape_string(praxis['praxis'])
                _HTML_data['gm_ver'] = gmTools.html_escape_string(gmTools.coalesce(_cfg.get(option = 'client_version'), 'git HEAD'))
                _HTML_data['adr'] = adr
                # create file
                index_fname = os.path.join(directory, 'index.html')
                index_file = open(index_fname, mode = 'wt', encoding = 'utf8')
                index_file.write(_INDEX_HTML_CONTENT % _HTML_data)
                index_file.close()
                return index_fname

        #--------------------------------------------------------
        def _create_frontpage_html(self, patient, praxis, directory, data, docs_list):
                # <li><a href="documents/filename-1.ext">document 1 description</a></li>
                _HTML_LIST_ITEM = '     <li><a href="%s">%s</a></li>'
                # header part
                _HTML_data = {
                        'html_title_header': _('Patient data for'),
                        'html_title_patient': gmTools.html_escape_string(patient.get_description_gender(with_nickname = False) + ', ' + _('born') + ' ' + patient.get_formatted_dob('%Y %B %d')),
                        'title': _('Patient data excerpt'),
                        'pat_name': gmTools.html_escape_string(patient.get_description_gender(with_nickname = False)),
                        'pat_dob': gmTools.html_escape_string(_('born') + ' ' + patient.get_formatted_dob('%Y %B %d')),
                        'mugshot_url': 'documents/no-such-file.png',
                        'mugshot_alt': _('no patient photograph available'),
                        'mugshot_title': '',
                        'docs_title': _('Documents'),
                        'browse_root': _('browse storage medium'),
                        'browse_docs': _('browse documents area'),
                        'doc_subdir': DOCUMENTS_SUBDIR,
                        'browse_dicomdir': '',
                        'run_dicom_viewer': '',
                        'date' : gmTools.html_escape_string(gmDateTime.pydt_strftime(gmDateTime.pydt_now_here(), format = '%Y %B %d'))
                }
                for key in data:
                        _HTML_data[key] = data[key]

                # documents part
                _HTML_docs_list = []
                for doc in docs_list:
                        subdir = os.path.join(directory, DOCUMENTS_SUBDIR, doc[0])
                        if os.path.isdir(subdir):
                                _HTML_docs_list.append(_HTML_LIST_ITEM % (os.path.join(DOCUMENTS_SUBDIR, doc[0]), _('DIRECTORY: %s/%s/') % (DOCUMENTS_SUBDIR, doc[0])))
                                _HTML_docs_list.append('        <ul>')
                                for fname in os.listdir(subdir):
                                        tmp = os.path.join(subdir, fname)
                                        if os.path.isdir(tmp):
                                                _HTML_docs_list.append('                <li><a href="%s">%s</a></li>' % (os.path.join(DOCUMENTS_SUBDIR, doc[0], fname), _('DIRECTORY: %s/%s/%s/') % (DOCUMENTS_SUBDIR, doc[0], fname)))
                                        else:
                                                _HTML_docs_list.append('                <li><a href="%s">%s</a></li>' % (os.path.join(DOCUMENTS_SUBDIR, doc[0], fname), fname))
                                _HTML_docs_list.append('        </ul>')
                        else:
                                _HTML_docs_list.append(_HTML_LIST_ITEM % (os.path.join(DOCUMENTS_SUBDIR, doc[0]), doc[1]))
                _HTML_data['docs_list'] = u'\n  '.join(_HTML_docs_list)

                # footer part
                lines = []
                adr = praxis.branch.org_unit.address
                if adr is not None:
                        lines.extend(adr.format())
                for comm in praxis.branch.org_unit.comm_channels:
                        if comm['is_confidential'] is True:
                                continue
                        lines.append('%s: %s' % (
                                comm['l10n_comm_type'],
                                comm['url']
                        ))
                adr = ''
                if len(lines) > 0:
                        adr = gmTools.html_escape_string('\n'.join(lines), replace_eol = True, keep_visual_eol = True)
                _HTML_data['branch'] = gmTools.html_escape_string(praxis['branch'])
                _HTML_data['praxis'] = gmTools.html_escape_string(praxis['praxis'])
                _HTML_data['gm_ver'] = gmTools.html_escape_string(gmTools.coalesce(_cfg.get(option = 'client_version'), 'git HEAD'))
                _HTML_data['adr'] = adr
                # create file
                frontpage_fname = os.path.join(directory, 'frontpage.html')
                frontpage_file = open(frontpage_fname, mode = 'wt', encoding = 'utf8')
                frontpage_file.write(_FRONTPAGE_HTML_CONTENT % _HTML_data)
                frontpage_file.close()
                return frontpage_fname

        #--------------------------------------------------------
        def _clone_dwv(self, target_dir=None):
                _log.debug('cloning dwv')
                # find DWV
                dwv_src_dir = os.path.join(gmTools.gmPaths().local_base_dir, 'resources', 'dwv4export')
                if not os.path.isdir(dwv_src_dir):
                        _log.debug('[%s] not found', dwv_src_dir)
                        dwv_src_dir = os.path.join(gmTools.gmPaths().system_app_data_dir, 'resources', 'dwv4export')
                if not os.path.isdir(dwv_src_dir):
                        _log.debug('[%s] not found', dwv_src_dir)
                        return None
                # clone it
                if target_dir is None:
                        target_dir = gmTools.mk_sandbox_dir()
                dwv_target_dir = os.path.join(target_dir, 'dwv')
                gmTools.rmdir(dwv_target_dir)
                try:
                        shutil.copytree(dwv_src_dir, dwv_target_dir)
                except (shutil.Error, OSError):
                        _log.exception('cannot include DWV, skipping')
                        return None

                return dwv_target_dir

        #--------------------------------------------------------
        def _create_readme(self, patient, directory):
                _README_CONTENT = (
                        'This is a patient data excerpt created by the GNUmed Electronic Medical Record.\n'
                        '\n'
                        'Patient: %s\n'
                        '\n'
                        'Please display <frontpage.html> to browse patient data.\n'
                        '\n'
                        'Individual documents are stored in the subdirectory\n'
                        '\n'
                        '       documents/\n'
                        '\n'
                        '\n'
                        'Data may need to be decrypted with either GNU Privacy\n'
                        'Guard or 7zip/WinZip.\n'
                        '\n'
                        '.asc:\n'
                        '       https://gnupg.org\n'
                        '\n'
                        '.7z:\n'
                        '       https://www.7-zip.org\n'
                        '       https://www.winzip.com\n'
                        '\n'
                        'To obtain any needed keys you will have to get in touch with\n'
                        'the creator or the owner of this patient media excerpt.\n'
                )
                readme_fname = os.path.join(directory, 'README')
                readme_file = open(readme_fname, mode = 'wt', encoding = 'utf8')
                if patient is None:
                        pat_str = _('<protected>')
                else:
                        pat_str = patient.get_description_gender(with_nickname = False) + ', ' + _('born') + ' ' + patient.get_formatted_dob('%Y %B %d')
                readme_file.write(_README_CONTENT % pat_str)
                readme_file.close()
                return readme_fname

        #--------------------------------------------------------
        def _create_cd_inf(self, patient, directory):
                _CD_INF_CONTENT = (
                        '[Patient Info]\r\n'                                    # needs \r\n for Windows
                        'PatientName=%s, %s\r\n'
                        'Gender=%s\r\n'
                        'BirthDate=%s\r\n'
                        'CreationDate=%s\r\n'
                        'PID=%s\r\n'
                        'EMR=GNUmed\r\n'
                        'Version=%s\r\n'
                        '#StudyDate=\r\n'
                        '#VNRInfo=<body part>\r\n'
                        '\r\n'
                        '# name format: lastnames, firstnames\r\n'
                        '# date format: YYYY-MM-DD (ISO 8601)\r\n'
                        '# gender format: %s\r\n'
                )
                fname = os.path.join(directory, 'CD.INF')
                cd_inf = open(fname, mode = 'wt', encoding = 'utf8')
                cd_inf.write(_CD_INF_CONTENT % (
                        patient['lastnames'],
                        patient['firstnames'],
                        gmTools.coalesce(patient['gender'], '?'),
                        patient.get_formatted_dob('%Y-%m-%d'),
                        gmDateTime.pydt_strftime(gmDateTime.pydt_now_here(), format = '%Y-%m-%d'),
                        patient.ID,
                        _cfg.get(option = 'client_version'),
                        ' / '.join([ '%s = %s (%s)' % (g['tag'], g['label'], g['l10n_label']) for g in patient.gender_list ])
                ))
                cd_inf.close()
                return fname

        #--------------------------------------------------------
        def _create_autorun_inf(self, patient, directory):
                _AUTORUN_INF_CONTENT = (                                                        # needs \r\n for Windows
                        '[AutoRun.Amd64]\r\n'                                   # 64 bit
                        'label=%(label)s\r\n'                                   # patient name/DOB
                        'shellexecute=index.html\r\n'
                        'action=%(action)s\r\n'                                 # % _('Browse patient data')
                        '%(icon)s\r\n'                                                  # "icon=gnumed.ico" or ""
                        'UseAutoPlay=1\r\n'
                        '\r\n'
                        '[AutoRun]\r\n'                                                 # 32 bit
                        'label=%(label)s\r\n'                                   # patient name/DOB
                        'shellexecute=index.html\r\n'
                        'action=%(action)s\r\n'                                 # % _('Browse patient data')
                        '%(icon)s\r\n'                                                  # "icon=gnumed.ico" or ""
                        'UseAutoPlay=1\r\n'
                        '\r\n'
                        '[Content]\r\n'
                        'PictureFiles=yes\r\n'
                        'VideoFiles=yes\r\n'
                        'MusicFiles=no\r\n'
                        '\r\n'
                        '[IgnoreContentPaths]\r\n'
                        '\documents\r\n'
                        '\r\n'
                        '[unused]\r\n'
                        'open=requires explicit executable\r\n'
                )
                autorun_dict = {
                        'label': self._compute_autorun_inf_label(patient),
                        'action': _('Browse patient data'),
                        'icon': ''
                }
                media_icon_kwd = '$$gnumed_patient_media_export_icon'
                media_icon_kwd_exp = gmKeywordExpansion.get_expansion (
                        keyword = media_icon_kwd,
                        textual_only = False,
                        binary_only = True
                )
                icon_tmp_fname = media_icon_kwd_exp.save_to_file (
                        target_mime = 'image/x-icon',
                        target_extension = '.ico',
                        ignore_conversion_problems = True
                )
                if icon_tmp_fname is None:
                        _log.error('cannot retrieve <%s>', media_icon_kwd)
                else:
                        media_icon_fname = os.path.join(directory, 'gnumed.ico')
                        try:
                                shutil.copy2(icon_tmp_fname, media_icon_fname)
                                autorun_dict['icon'] = 'icon=gnumed.ico'
                        except Exception:
                                _log.exception('cannot move %s to %s', icon_tmp_fname, media_icon_fname)
                autorun_fname = os.path.join(directory, 'AUTORUN.INF')
                autorun_file = open(autorun_fname, mode = 'wt', encoding = 'cp1252', errors = 'replace')
                autorun_file.write(_AUTORUN_INF_CONTENT % autorun_dict)
                autorun_file.close()
                return autorun_fname

        #--------------------------------------------------------
        def _compute_autorun_inf_label(self, patient):
                if patient is None:
                        # if no patient provided - assume the target media
                        # is to be encrypted and thusly do not expose patient data,
                        # AUTORUN.INF itself will not be encrypted because
                        # Windows must be able to parse it
                        return _('GNUmed patient data excerpt')[:32]

                LABEL_MAX_LEN = 32
                dob = patient.get_formatted_dob(format = ' %Y%m%d', none_string = '', honor_estimation = False)
                if dob == '':
                        gender_template = ' (%s)'
                else:
                        gender_template = ' %s'
                gender = gmTools.coalesce(patient['gender'], '', gender_template)
                name_max_len = LABEL_MAX_LEN - len(gender) - len(dob)                   # they already include appropriate padding
                name = patient.active_name
                last = name['lastnames'].strip()
                first = name['firstnames'].strip()
                len_last = len(last)
                len_first = len(first)
                while (len_last + len_first + 1) > name_max_len:
                        if len_first > 6:
                                len_first -= 1
                                if first[len_first - 1] == ' ':
                                        len_first -= 1
                                continue
                        len_last -= 1
                        if last[len_last - 1] == ' ':
                                len_last -= 1
                last = last[:len_last].strip().upper()
                first = first[:len_first].strip()
                # max 32 chars, supposedly ASCII, but CP1252 likely works pretty well
                label = (('%s %s%s%s' % (last, first, dob,      gender)).strip())[:32]
                return label

        #--------------------------------------------------------
        # properties
        #--------------------------------------------------------
        def get_items(self, designation=None, order_by='pk_identity, list_position, designation, description'):
                return get_export_items(order_by = order_by, pk_identity = self.__pk_identity, designation = designation)

        items = property(get_items)

        #--------------------------------------------------------
        def get_printouts(self, order_by='list_position, designation, description'):
                return get_print_jobs(order_by = order_by, pk_identity = self.__pk_identity)

        printouts = property(get_printouts)

#===========================================================================
# passphrase escrow
#---------------------------------------------------------------------------
def store_passphrase_of_file_callback(filename:str=None, passphrase:str=None, comment:dict=None, master_passphrase:str=None):
        return store_passphrase_of_file (
                filename = filename,
                passphrase = passphrase,
                hash_type = 'sha256',
                comment = comment,
                master_passphrase = master_passphrase
        )

#---------------------------------------------------------------------------
def store_passphrase_of_file(filename:str=None, passphrase:str=None, hash_type:str='sha256', comment:dict=None, master_passphrase:str=None) -> bool:
        """Call store_object_passphrase on a given file."""
        try:
                f = open(filename, 'rb')
        except Exception:
                _log.exception('cannot open [%s]', filename)
                return False

        if comment is None:
                comment = {}
        comment['__filename__'] = gmTools.fname_from_path(filename)
        return store_object_passphrase (
                obj = f,
                passphrase = passphrase,
                hash_type = hash_type,
                comment = comment,
                master_passphrase = master_passphrase
        )

#---------------------------------------------------------------------------
def get_passphrase_trustees_pubkey_files(passphrase_owner:gmStaff.cStaff|gmStaff.gmCurrentProvider=None) -> list[str]:
        """Retrieve public keys of passphrase trustees as files.

        Args:
                passphrase_owner: staff member to encrypt passphrase to (by public key), defaults to current provider

        Returns:
                Possibly empty list of public keys.
        """
        pubkey_files = []
        if not passphrase_owner:
                passphrase_owner = gmStaff.gmCurrentProvider()
        owner_key_file = passphrase_owner.public_key_file
        if owner_key_file:
                pubkey_files.append(owner_key_file)
        else:
                _log.warning('no public key for owner [%s]', passphrase_owner)
        trustee_key_files = gmStaff.get_public_keys_of_passphrase_trustees(as_files = True)
        if trustee_key_files:
                pubkey_files.extend(trustee_key_files)
        else:
                _log.warning('there are no trustee public keys configured')
        if not pubkey_files:
                _log.warning('neither owner nor trustee public keys available')
        return pubkey_files

#---------------------------------------------------------------------------
def store_object_passphrase (
        obj=None,
        passphrase:str=None,
        hash_type:str='sha256',
        passphrase_owner:gmStaff.cStaff|gmStaff.gmCurrentProvider=None,
        comment:dict=None,
        master_passphrase:str=None
) -> bool:
        """Store in the database the (encrypted) passphrase for an object.

        The passphrase is stored encrypted with the public key
        of the owner as well as with any public key configured
        as a passphrase trustee, or else symmetrically encrypted
        with master_passphrase.

        Args:
                obj: an instance supporting the (binary) read protocol
                passphrase: the passphrase to store
                hash_type: the hash to use, defaults to '256'
                passphrase_owner: staff member to encrypt passphrase to (by public key), defaults to current provider
                comment: a structured comment on the object, a hint regarding the encryption method is added
                master_passphrase: when there is no public keys available, use this - if set - for symmetric encryption of the passphrase

        Returns:
                True/False based on success
        """
        assert obj, '<obj> must not be None'
        assert passphrase, '<passphrase> must not be None'

        if hash_type not in hashlib.algorithms_available:
                _log.error('hash type [%s] not available amongst %s', hash_type, hashlib.algorithms_available)
                return False

        if not comment:
                comment = {}
        pubkey_files = get_passphrase_trustees_pubkey_files(passphrase_owner = passphrase_owner)
        if pubkey_files:
                comment['__encryption_method__'] = 'pgp::asymmetric'
        else:
                _log.error('cannot escrow passphrase asymmetrically')
                if not master_passphrase:
                        _log.error('cannot escrow passphrase symmetrically either')
                        return False

        hashor = hashlib.new(hash_type, usedforsecurity = False)
        chunk_size = 5 * 1024 * 1024
        data = obj.read(chunk_size)
        while data:
                hashor.update(data)
                data = obj.read(chunk_size)
        hash_val = hashor.hexdigest()
        encrypted_phrase = gmCrypto.encrypt_data (
                data = passphrase,
                recipient_key_files = pubkey_files,
                comment = '[%s]::%s' % (hash_type, hash_val),
                verbose = _cfg.get(option = 'debug'),
                master_passphrase = master_passphrase
        )
        comment['__encryption_method__'] = encrypted_phrase['method']
        SQL = """INSERT INTO gm.obj_export_passphrase (
                hash_type, hash, phrase, description
        ) VALUES (
                %(hash_type)s,
                %(hash)s,
                %(phrase)s,
                %(desc)s
        )"""
        args = {
                'hash_type': hash_type,
                'hash': hash_val,
                'phrase': encrypted_phrase['data'],
                'desc': comment
        }
        gmPG2.run_rw_queries(queries = [{'cmd': SQL, 'args': args}])
        return True

#---------------------------------------------------------------------------
#---------------------------------------------------------------------------
def get_object_passphrases(hash_value:str=None, order_by:str=None, link_obj=None) -> list[gmPG2._TRow] | None:
        """Retrieve encrypted passphrases.

        Args:
                hash_value: the hash to look up passphrases for, if given

        Returns
                List of rows containing encrypted passphrases, or None.
        """
        WHERE_PARTS = []
        args = {}
        if hash_value:
                WHERE_PARTS.append('hash = %(hash)s')
                args['hash'] = hash_value
        SQL = 'SELECT * FROM gm.obj_export_passphrase'
        if WHERE_PARTS:
                SQL += ' WHERE %s' % ' AND '.join(WHERE_PARTS)
        if order_by:
                SQL += ' ORDER BY %s' % order_by
        rows = gmPG2.run_ro_queries(queries = [{'cmd': SQL, 'args': args}], link_obj = link_obj)
        if not rows:
                return None

        return rows

#---------------------------------------------------------------------------
def save_file_passphrases_into_files() -> list[str] | None:
        try:
                hash_val = input('Please enter the file hash: ')
        except KeyboardInterrupt:
                return None

        return save_object_passphrase_to_file(hash_val)

#---------------------------------------------------------------------------
def save_object_passphrase_to_file(hash_value:str=None) -> list[str] | None:
        """Save encrypted passphrases known for a hash into files.

        Args:
                hash_value: the hash to look up passphrases for

        Returns
                List of files containing encrypted passphrases, or None.
        """
        rows = get_object_passphrases(hash_value = hash_value)
        if not rows:
                return None

        sermon = _(
                'In order to decrypt the paperwork passphrase you will need\n'
                'access to the secret GPG key or the master passphrase,\n'
                'depending on the encryption method used.\n'
                '\n'
                'Note that GNUmed does not itself store the master\n'
                'passphrase in any way, shape, or form.\n'
        )
        phrase_files = []
        for row in rows:
                phrasefile_name = '%s-%s-passphrase.txt' % (row['hash'], row['hash_type'])
                with open(phrasefile_name, mode = 'wt', encoding = 'utf8') as phrasefile:
                        phrasefile.write('Searched database for hash value: %s\n' % hash_value)
                        phrasefile.write('\n')
                        phrasefile.write('Entry found:\n')
                        phrasefile.write(' hash method [%s]\n' % row['hash_type'])
                        phrasefile.write(' hash value [%s]\n' % row['hash'])
                        if row['description']:
                                phrasefile.write('\n')
                                phrasefile.write(' Associated description:\n')
                                for key in row['description']:
                                        phrasefile.write('  %s [%s]\n' % (key, row['description'][key]))
                        phrasefile.write('\n')
                        phrasefile.write(_('Encrypted paperwork passphrase:\n'))
                        phrasefile.write('#********** 8< ****************************************#\n')
                        phrasefile.write(row['phrase'])
                        phrasefile.write('#********** 8< ****************************************#\n')
                        phrasefile.write('\n')
                        phrasefile.write(sermon)
                        phrasefile.write('\n')
                phrase_files.append(phrasefile_name)
        return phrase_files

#============================================================
#============================================================
if __name__ == '__main__':

        if len(sys.argv) < 2:
                sys.exit()

        if sys.argv[1] != 'test':
                sys.exit()

        del _
        from Gnumed.pycommon import gmI18N
        gmI18N.activate_locale()
        gmI18N.install_domain()

        from Gnumed.business import gmPraxis

        #---------------------------------------
        def test_export_items():
#               items = get_export_items()
#               for item in items:
#                       print item.format()
                import random
                create_export_item(description = 'description %s' % random.random(), pk_identity = 12, pk_doc_obj = None, filename = 'dummy.dat')
                items = get_export_items()
                for item in items:
                        print(item.format())
                item['pk_doc_obj'] = 1
                item.save()
                print(item)

        #---------------------------------------
        def test_export_area():
                exp = cExportArea(12)
                #print exp.export_with_meta_data()
                #print exp.items
                #exp.add_file(sys.argv[2])
                #prax = 
                gmPraxis.gmCurrentPraxisBranch(branch = gmPraxis.cPraxisBranch(1))
                #print(prax)
                #print(prax.branch)
                #try:
                #       pwd = sys.argv[2]
                #except IndexError:
                #       pwd = None
                #print(exp.export(passphrase = pwd))
                #for item in exp.items:
                #       print(item)
                #       item['list_position'] = 4
                #       item.save()
                #       print(item)
                #       input()
                print(exp.dump_items_to_disk())

        #---------------------------------------
        def test_label():

                from Gnumed.business.gmPerson import cPatient
                #from Gnumed.business.gmPersonSearch import ask_for_patient

                #while ask_for_patient() is not None:
                pat_min = 1
                pat_max = 100
                try:
                        pat_min = int(sys.argv[2])
                        pat_max = int(sys.argv[3])
                except Exception:
                        pass
                cPatient(aPK_obj = pat_min)
                f = open('x-auto_inf_labels.txt', mode = 'w', encoding = 'utf8')
                f.write('--------------------------------\n')
                f.write('12345678901234567890123456789012\n')
                f.write('--------------------------------\n')
                for pat_id in range(pat_min, pat_max):
                        try:
                                exp_area = cExportArea(pat_id)
                                pat = cPatient(aPK_obj = pat_id)
                        except Exception:
                                continue
                        f.write(exp_area._compute_autorun_inf_label(pat) + '\n')
                f.close()
                return

        #---------------------------------------
        def test_store_passphrase_of_file():
                gmStaff.set_current_provider_to_logged_on_user()
                print('file:', sys.argv[2])
                for h_t in [None, 'md5', 'sha256', 'ripemd160', 'invalid_hash_type']:
                        print(h_t, '-', store_passphrase_of_file(filename = sys.argv[2], passphrase='12345', hash_type = h_t))

        #---------------------------------------
        def test_save_object_passphrase_to_file():
                print(save_object_passphrase_to_file(hash_value = sys.argv[2]))

        #---------------------------------------
        #test_export_items()

        gmPG2.request_login_params(setup_pool = True)
        #test_export_area()
        #test_label()
        test_store_passphrase_of_file()
        #test_save_object_passphrase_to_file()

#============================================================
# CDROM "run.bat":
#
#@echo off
#
#if defined ProgramFiles(x86) (
#    ::64-bit
#    start /B x64\mdicom.exe /scan .
#) else (
#    ::32-bit
#    start /B win32\mdicom.exe /scan .
#)
#
#--------------------------------------------------

Functions

def create_export_item(description=None, pk_identity=None, pk_doc_obj=None, filename=None)
Expand source code
def create_export_item(description=None, pk_identity=None, pk_doc_obj=None, filename=None):

        args = {
                'desc': description,
                'pk_obj': pk_doc_obj,
                'pk_pat': pk_identity,
                'fname': filename
        }
        cmd = """
                INSERT INTO clin.export_item (
                        description,
                        fk_doc_obj,
                        fk_identity,
                        data,
                        filename
                ) VALUES (
                        gm.nullify_empty_string(%(desc)s),
                        %(pk_obj)s,
                        %(pk_pat)s,
                        (CASE
                                WHEN %(pk_obj)s IS NULL THEN %(fname)s::bytea
                                ELSE NULL::bytea
                        END),
                        (CASE
                                WHEN %(pk_obj)s IS NULL THEN gm.nullify_empty_string(%(fname)s)
                                ELSE NULL
                        END)
                )
                RETURNING pk
        """
        rows = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}], return_data = True)
        return cExportItem(aPK_obj = rows[0]['pk'])
def delete_export_item(pk_export_item=None)
Expand source code
def delete_export_item(pk_export_item=None):
        args = {'pk': pk_export_item}
        cmd = "DELETE FROM clin.export_item WHERE pk = %(pk)s"
        gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
        return True
def get_export_items(order_by=None, pk_identity=None, designation=None, return_pks=False)
Expand source code
def get_export_items(order_by=None, pk_identity=None, designation=None, return_pks=False):

        args = {
                'pat': pk_identity,
                'desig': gmTools.coalesce(designation, PRINT_JOB_DESIGNATION)
        }
        where_parts = []
        if pk_identity is not None:
                where_parts.append('pk_identity = %(pat)s')
                # note that invalidly linked items will be
                # auto-healed when instantiated
        if designation is None:
                where_parts.append("designation IS DISTINCT FROM %(desig)s")
        else:
                where_parts.append('designation = %(desig)s')
        if order_by is None:
                order_by = 'pk_identity, list_position'
        order_by = ' ORDER BY %s' % order_by
        cmd = (_SQL_get_export_items % ' AND '.join(where_parts)) + order_by
        rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])
        if return_pks:
                return [ r['pk_export_item'] for r in rows ]

        return [ cExportItem(row = {'data': r, 'pk_field': 'pk_export_item'}) for r in rows ]
def get_object_passphrases(hash_value: str = None, order_by: str = None, link_obj=None) ‑> list[psycopg2.extras.DictRow] | None

Retrieve encrypted passphrases.

Args

hash_value
the hash to look up passphrases for, if given

Returns List of rows containing encrypted passphrases, or None.

Expand source code
def get_object_passphrases(hash_value:str=None, order_by:str=None, link_obj=None) -> list[gmPG2._TRow] | None:
        """Retrieve encrypted passphrases.

        Args:
                hash_value: the hash to look up passphrases for, if given

        Returns
                List of rows containing encrypted passphrases, or None.
        """
        WHERE_PARTS = []
        args = {}
        if hash_value:
                WHERE_PARTS.append('hash = %(hash)s')
                args['hash'] = hash_value
        SQL = 'SELECT * FROM gm.obj_export_passphrase'
        if WHERE_PARTS:
                SQL += ' WHERE %s' % ' AND '.join(WHERE_PARTS)
        if order_by:
                SQL += ' ORDER BY %s' % order_by
        rows = gmPG2.run_ro_queries(queries = [{'cmd': SQL, 'args': args}], link_obj = link_obj)
        if not rows:
                return None

        return rows
def get_passphrase_trustees_pubkey_files(passphrase_owner: cStaff | gmCurrentProvider = None) ‑> list[str]

Retrieve public keys of passphrase trustees as files.

Args

passphrase_owner
staff member to encrypt passphrase to (by public key), defaults to current provider

Returns

Possibly empty list of public keys.

Expand source code
def get_passphrase_trustees_pubkey_files(passphrase_owner:gmStaff.cStaff|gmStaff.gmCurrentProvider=None) -> list[str]:
        """Retrieve public keys of passphrase trustees as files.

        Args:
                passphrase_owner: staff member to encrypt passphrase to (by public key), defaults to current provider

        Returns:
                Possibly empty list of public keys.
        """
        pubkey_files = []
        if not passphrase_owner:
                passphrase_owner = gmStaff.gmCurrentProvider()
        owner_key_file = passphrase_owner.public_key_file
        if owner_key_file:
                pubkey_files.append(owner_key_file)
        else:
                _log.warning('no public key for owner [%s]', passphrase_owner)
        trustee_key_files = gmStaff.get_public_keys_of_passphrase_trustees(as_files = True)
        if trustee_key_files:
                pubkey_files.extend(trustee_key_files)
        else:
                _log.warning('there are no trustee public keys configured')
        if not pubkey_files:
                _log.warning('neither owner nor trustee public keys available')
        return pubkey_files
def get_print_jobs(order_by=None, pk_identity=None)
Expand source code
def get_print_jobs(order_by=None, pk_identity=None):
        return get_export_items(order_by = order_by, pk_identity = pk_identity, designation = PRINT_JOB_DESIGNATION)
def save_file_passphrases_into_files() ‑> list[str] | None
Expand source code
def save_file_passphrases_into_files() -> list[str] | None:
        try:
                hash_val = input('Please enter the file hash: ')
        except KeyboardInterrupt:
                return None

        return save_object_passphrase_to_file(hash_val)
def save_object_passphrase_to_file(hash_value: str = None) ‑> list[str] | None

Save encrypted passphrases known for a hash into files.

Args

hash_value
the hash to look up passphrases for

Returns List of files containing encrypted passphrases, or None.

Expand source code
def save_object_passphrase_to_file(hash_value:str=None) -> list[str] | None:
        """Save encrypted passphrases known for a hash into files.

        Args:
                hash_value: the hash to look up passphrases for

        Returns
                List of files containing encrypted passphrases, or None.
        """
        rows = get_object_passphrases(hash_value = hash_value)
        if not rows:
                return None

        sermon = _(
                'In order to decrypt the paperwork passphrase you will need\n'
                'access to the secret GPG key or the master passphrase,\n'
                'depending on the encryption method used.\n'
                '\n'
                'Note that GNUmed does not itself store the master\n'
                'passphrase in any way, shape, or form.\n'
        )
        phrase_files = []
        for row in rows:
                phrasefile_name = '%s-%s-passphrase.txt' % (row['hash'], row['hash_type'])
                with open(phrasefile_name, mode = 'wt', encoding = 'utf8') as phrasefile:
                        phrasefile.write('Searched database for hash value: %s\n' % hash_value)
                        phrasefile.write('\n')
                        phrasefile.write('Entry found:\n')
                        phrasefile.write(' hash method [%s]\n' % row['hash_type'])
                        phrasefile.write(' hash value [%s]\n' % row['hash'])
                        if row['description']:
                                phrasefile.write('\n')
                                phrasefile.write(' Associated description:\n')
                                for key in row['description']:
                                        phrasefile.write('  %s [%s]\n' % (key, row['description'][key]))
                        phrasefile.write('\n')
                        phrasefile.write(_('Encrypted paperwork passphrase:\n'))
                        phrasefile.write('#********** 8< ****************************************#\n')
                        phrasefile.write(row['phrase'])
                        phrasefile.write('#********** 8< ****************************************#\n')
                        phrasefile.write('\n')
                        phrasefile.write(sermon)
                        phrasefile.write('\n')
                phrase_files.append(phrasefile_name)
        return phrase_files
def store_object_passphrase(obj=None, passphrase: str = None, hash_type: str = 'sha256', passphrase_owner: cStaff | gmCurrentProvider = None, comment: dict = None, master_passphrase: str = None) ‑> bool

Store in the database the (encrypted) passphrase for an object.

The passphrase is stored encrypted with the public key of the owner as well as with any public key configured as a passphrase trustee, or else symmetrically encrypted with master_passphrase.

Args

obj
an instance supporting the (binary) read protocol
passphrase
the passphrase to store
hash_type
the hash to use, defaults to '256'
passphrase_owner
staff member to encrypt passphrase to (by public key), defaults to current provider
comment
a structured comment on the object, a hint regarding the encryption method is added
master_passphrase
when there is no public keys available, use this - if set - for symmetric encryption of the passphrase

Returns

True/False based on success

Expand source code
def store_object_passphrase (
        obj=None,
        passphrase:str=None,
        hash_type:str='sha256',
        passphrase_owner:gmStaff.cStaff|gmStaff.gmCurrentProvider=None,
        comment:dict=None,
        master_passphrase:str=None
) -> bool:
        """Store in the database the (encrypted) passphrase for an object.

        The passphrase is stored encrypted with the public key
        of the owner as well as with any public key configured
        as a passphrase trustee, or else symmetrically encrypted
        with master_passphrase.

        Args:
                obj: an instance supporting the (binary) read protocol
                passphrase: the passphrase to store
                hash_type: the hash to use, defaults to '256'
                passphrase_owner: staff member to encrypt passphrase to (by public key), defaults to current provider
                comment: a structured comment on the object, a hint regarding the encryption method is added
                master_passphrase: when there is no public keys available, use this - if set - for symmetric encryption of the passphrase

        Returns:
                True/False based on success
        """
        assert obj, '<obj> must not be None'
        assert passphrase, '<passphrase> must not be None'

        if hash_type not in hashlib.algorithms_available:
                _log.error('hash type [%s] not available amongst %s', hash_type, hashlib.algorithms_available)
                return False

        if not comment:
                comment = {}
        pubkey_files = get_passphrase_trustees_pubkey_files(passphrase_owner = passphrase_owner)
        if pubkey_files:
                comment['__encryption_method__'] = 'pgp::asymmetric'
        else:
                _log.error('cannot escrow passphrase asymmetrically')
                if not master_passphrase:
                        _log.error('cannot escrow passphrase symmetrically either')
                        return False

        hashor = hashlib.new(hash_type, usedforsecurity = False)
        chunk_size = 5 * 1024 * 1024
        data = obj.read(chunk_size)
        while data:
                hashor.update(data)
                data = obj.read(chunk_size)
        hash_val = hashor.hexdigest()
        encrypted_phrase = gmCrypto.encrypt_data (
                data = passphrase,
                recipient_key_files = pubkey_files,
                comment = '[%s]::%s' % (hash_type, hash_val),
                verbose = _cfg.get(option = 'debug'),
                master_passphrase = master_passphrase
        )
        comment['__encryption_method__'] = encrypted_phrase['method']
        SQL = """INSERT INTO gm.obj_export_passphrase (
                hash_type, hash, phrase, description
        ) VALUES (
                %(hash_type)s,
                %(hash)s,
                %(phrase)s,
                %(desc)s
        )"""
        args = {
                'hash_type': hash_type,
                'hash': hash_val,
                'phrase': encrypted_phrase['data'],
                'desc': comment
        }
        gmPG2.run_rw_queries(queries = [{'cmd': SQL, 'args': args}])
        return True
def store_passphrase_of_file(filename: str = None, passphrase: str = None, hash_type: str = 'sha256', comment: dict = None, master_passphrase: str = None) ‑> bool

Call store_object_passphrase on a given file.

Expand source code
def store_passphrase_of_file(filename:str=None, passphrase:str=None, hash_type:str='sha256', comment:dict=None, master_passphrase:str=None) -> bool:
        """Call store_object_passphrase on a given file."""
        try:
                f = open(filename, 'rb')
        except Exception:
                _log.exception('cannot open [%s]', filename)
                return False

        if comment is None:
                comment = {}
        comment['__filename__'] = gmTools.fname_from_path(filename)
        return store_object_passphrase (
                obj = f,
                passphrase = passphrase,
                hash_type = hash_type,
                comment = comment,
                master_passphrase = master_passphrase
        )
def store_passphrase_of_file_callback(filename: str = None, passphrase: str = None, comment: dict = None, master_passphrase: str = None)
Expand source code
def store_passphrase_of_file_callback(filename:str=None, passphrase:str=None, comment:dict=None, master_passphrase:str=None):
        return store_passphrase_of_file (
                filename = filename,
                passphrase = passphrase,
                hash_type = 'sha256',
                comment = comment,
                master_passphrase = master_passphrase
        )

Classes

class cExportArea (pk_identity)
Expand source code
class cExportArea(object):

        def __init__(self, pk_identity):
                self.__pk_identity = pk_identity

        #--------------------------------------------------------
        def add_form(self, form=None, designation=None):

                if len(form.final_output_filenames) == 0:
                        return True

                items = []
                for fname in form.final_output_filenames:
                        item = self.add_file(filename = fname)
                        if item is None:
                                for prev_item in items:
                                        delete_export_item(pk_export_item = prev_item['pk_export_item'])
                                return False
                        items.append(item)
                        item['description'] = _('form: %s %s (%s)') % (form.template['name_long'], form.template['external_version'], fname)
                        item['designation'] = designation
                        item.save()

                return True

        #--------------------------------------------------------
        def add_forms(self, forms=None, designation=None):
                all_ok = True
                for form in forms:
                        all_ok = all_ok and self.add_form(form = form, designation = designation)

                return all_ok

        #--------------------------------------------------------
        def add_path(self, path, comment=None):
                """Add a DIR entry to the export area.

                This sort of entry points to a certain directory on a
                certain machine. The content of the the directory
                will be included in exports, the directory *itself*
                will not. For *that*, use a disposable top-level
                directory into which you put the directory to include
                as a subdirectory.
                """
                assert (os.path.isdir(path)), '<path> must exist: %s' % path

                path_item_data = 'DIR::%s::%s/' % (platform.node(), path.rstrip('/'))
                _log.debug('attempting to add path item [%s]', path_item_data)
                item = self.path_item_exists(path_item_data)
                if item is not None:
                        _log.debug('[%s] already in export area', path)
                        return item

                if comment is None:
                        comment = _('path [%s/] on computer "%s"') % (
                                path.rstrip('/'),
                                platform.node()
                        )
                else:
                        comment += _(' (on "%s")') % platform.node()

                item = create_export_item (
                        description = comment,
                        pk_identity = self.__pk_identity,
                        filename = path_item_data
                )
                try:
                        README = open(os.path.join(path, DIRENTRY_README_NAME), mode = 'wt', encoding = 'utf8')
                        README.write('GNUmed DIRENTRY information\n')
                        README.write('created: %s\n' % gmDateTime.pydt_now_here())
                        README.write('machine: %s\n' % platform.node())
                        README.write('path: %s\n' % path)
                        README.close()
                except OSError:
                        _log.exception('READONLY DIRENTRY [%s]', path)

                return item

        #--------------------------------------------------------
        def path_item_exists(self, path_item_data):

                assert (path_item_data.startswith('DIR::')), 'invalid <path_item_data> [%s]' % path_item_data

                where_parts = [
                        'pk_identity = %(pat)s',
                        'filename = %(fname)s',
                        'pk_doc_obj IS NULL'
                ]
                args = {
                        'pat': self.__pk_identity,
                        'fname': path_item_data
                }
                SQL = _SQL_get_export_items % ' AND '.join(where_parts)
                rows = gmPG2.run_ro_queries(queries = [{'cmd': SQL, 'args': args}])
                if len(rows) == 0:
                        return None

                r = rows[0]
                return cExportItem(row = {'data': r, 'pk_field': 'pk_export_item'})

        #--------------------------------------------------------
        def add_file(self, filename=None, hint=None):
                try:
                        open(filename).close()
                except Exception:
                        _log.exception('cannot open file <%s>', filename)
                        return None

                file_md5 = gmTools.file2md5(filename = filename, return_hex = True)
                existing_item = self.md5_exists(md5 = file_md5, include_document_parts = False)
                if existing_item is not None:
                        _log.debug('md5 match (%s): %s already in export area', file_md5, filename)
                        return existing_item

                path, basename = os.path.split(filename)
                item = create_export_item (
                        description = '%s: %s (%s/)' % (
                                gmTools.coalesce(hint, _('file'), '%s'),
                                basename,
                                path
                        ),
                        pk_identity = self.__pk_identity,
                        filename = filename
                )

                if item.update_data_from_file(filename = filename):
                        return item

                # failed to insert data, hence remove export item entry
                delete_export_item(pk_export_item = item['pk_export_item'])
                return None

        #--------------------------------------------------------
        def add_files(self, filenames=None, hint=None):
                all_ok = True
                for fname in filenames:
                        all_ok = all_ok and (self.add_file(filename = fname, hint = hint) is not None)

                return all_ok

        #--------------------------------------------------------
        def add_documents(self, documents:list=None) -> None:
                for doc in documents:
                        doc_tag = _('%s (%s)%s') % (
                                doc['l10n_type'],
                                gmDateTime.pydt_strftime(doc['clin_when'], '%Y %b %d'),
                                gmTools.coalesce(doc['comment'], '', ' "%s"')
                        )
                        for obj in doc.parts:
                                if self.document_part_item_exists(pk_part = obj['pk_obj']):
                                        continue
                                f_ext = ''
                                if obj['filename'] is not None:
                                        f_ext = os.path.splitext(obj['filename'])[1].strip('.').strip()
                                if f_ext != '':
                                        f_ext = ' .' + f_ext.upper()
                                obj_tag = _('part %s (%s%s)%s') % (
                                        obj['seq_idx'],
                                        gmTools.size2str(obj['size']),
                                        f_ext,
                                        gmTools.coalesce(obj['obj_comment'], '', ' "%s"')
                                )
                                create_export_item (
                                        description = '%s - %s' % (doc_tag, obj_tag),
                                        pk_doc_obj = obj['pk_obj']
                                )

        #--------------------------------------------------------
        def document_part_item_exists(self, pk_part=None):
                cmd = "SELECT EXISTS (SELECT 1 FROM clin.export_item WHERE fk_doc_obj = %(pk_obj)s)"
                args = {'pk_obj': pk_part}
                rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])
                return rows[0][0]

        #--------------------------------------------------------
        def md5_exists(self, md5=None, include_document_parts=False):
                where_parts = [
                        'pk_identity = %(pat)s',
                        'md5_sum = %(md5)s'
                ]
                args = {
                        'pat': self.__pk_identity,
                        'md5': md5
                }

                if not include_document_parts:
                        where_parts.append('pk_doc_obj IS NULL')

                cmd = _SQL_get_export_items % ' AND '.join(where_parts)
                rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])

                if len(rows) == 0:
                        return None

                r = rows[0]
                return cExportItem(row = {'data': r, 'pk_field': 'pk_export_item'})

        #--------------------------------------------------------
        def remove_item(self, item):
                if item.is_valid_DIRENTRY:
                        gmTools.remove_file(os.path.join(item.DIRENTRY_path, DIRENTRY_README_NAME))
                return delete_export_item(pk_export_item = item['pk_export_item'])

        #--------------------------------------------------------
        def dump_items_to_disk(self, base_dir:str=None, items:list=None, passphrase:str=None, convert2pdf:bool=False, master_passphrase:str=None) -> str:
                """Dump export area items to disk.

                Args:
                        base_dir: directory into which to dump the items, defaults to a new sandbox directory
                        items: which items to dump, defaults to all items
                        passphrase: used to symmetrically encrypt dumped files, if set
                        convert2pdf: attempt to convert items into PDF before dumping them
                        master_passphrase: encrypt passphrase with this password for safekeeping in GNUmed if no pubkeys available

                Returns:
                        The base_dir into which items were dumped, or None.
                """
                if items is None:
                        items = self.items
                if len(items) == 0:
                        return None

                if base_dir is None:
                        base_dir = gmTools.mk_sandbox_dir()
                else:
                        gmTools.mkdir(base_dir)
                _log.debug('dumping export items to: %s', base_dir)
                for item in items:
                        saved_fname = item.save_to_file(directory = base_dir, passphrase = passphrase, convert2pdf = convert2pdf)
                        if saved_fname is None:
                                return None

                        if passphrase:
                                store_passphrase_of_file(filename = saved_fname, passphrase = passphrase, master_passphrase = master_passphrase)

                return base_dir

        #--------------------------------------------------------
        def dump_items_to_disk_as_zip(self, base_dir:str=None, items:list=None, passphrase:str=None, master_passphrase:str=None) -> str:
                """Dump items to disk into a zip archive.

                        Calls dump_items_to_disk().

                Args:
                        base_dir: directory into which to dump the items, defaults to a new sandbox directory
                        items: which items to dump, defaults to all items
                        passphrase: used to symmetrically encrypt dumped files, if set
                        master_passphrase: encrypt passphrase with this password for safekeeping in GNUmed if no pubkeys available

                Returns:
                        Zip archive name, or None.
                """
                _log.debug('target dir: %s', base_dir)
                dump_dir = self.dump_items_to_disk(base_dir = base_dir, items = items)
                if dump_dir is None:
                        _log.error('cannot dump export area items')
                        return None

                if passphrase:
                        zip_file = gmCrypto.create_encrypted_zip_archive_from_dir (
                                source_dir = dump_dir,
                                comment = _('GNUmed Patient Media'),
                                overwrite = True,
                                passphrase = passphrase,
                                verbose = _cfg.get(option = 'debug')
                        )
                        if zip_file:
                                store_passphrase_of_file(filename = zip_file, passphrase = passphrase, master_passphrase = master_passphrase)
                        else:
                                _log.error('cannot zip+encrypt export area items dump')
                        return zip_file

                zip_file = gmCrypto.create_zip_archive_from_dir (
                        dump_dir,
                        comment = _('GNUmed Patient Media'),
                        overwrite = True,
                        verbose = _cfg.get(option = 'debug')
                )
                if not zip_file:
                        _log.error('cannot zip export area items dump')
                return zip_file

        #--------------------------------------------------------
        def export(self, base_dir:str=None, items:list=None, passphrase:str=None, master_passphrase:str=None) -> bool|str:
                """Export items as structured patient media.

                Args:
                        base_dir: directory into which to store the export, defaults to a new sandbox directory based on patient name
                        items: which items to dump, defaults to all items
                        passphrase: used to symmetrically encrypt dumped files, if set
                        master_passphrase: password to encrypt passphrase with for safekeeping, if no public keys available for that

                Returns:
                        Base_dir name, else None or False on failure.
                """
                if items is None:
                        items = self.items
                if len(items) == 0:
                        return None

                from Gnumed.business.gmPerson import cPatient
                pat = cPatient(aPK_obj = self.__pk_identity)
                target_base_dir = base_dir
                if target_base_dir is None:
                        target_sandbox_dir = gmTools.mk_sandbox_dir()
                        target_base_dir = os.path.join(target_sandbox_dir, pat.subdir_name)
                gmTools.mkdir(target_base_dir)
                _log.debug('patient media base dir: %s', target_base_dir)
                if not gmTools.dir_is_empty(target_base_dir):
                        _log.error('patient media base dir is not empty')
                        return False

                from Gnumed.business.gmPraxis import gmCurrentPraxisBranch
                prax = gmCurrentPraxisBranch()

                html_data = {}

                # 1) assemble everything into a sandbox
                # - setup sandbox
                sandbox_dir = gmTools.mk_sandbox_dir()
                _log.debug('sandbox dir: %s', sandbox_dir)
                doc_dir = os.path.join(sandbox_dir, DOCUMENTS_SUBDIR)
                gmTools.mkdir(doc_dir)
                # - export mugshot
                mugshot = pat.document_folder.latest_mugshot
                if mugshot is not None:
                        mugshot_fname = mugshot.save_to_file(directory = doc_dir)
                        fname = os.path.split(mugshot_fname)[1]
                        html_data['mugshot_url'] = os.path.join(DOCUMENTS_SUBDIR, fname)
                        html_data['mugshot_alt'] =_('patient photograph from %s') % gmDateTime.pydt_strftime(mugshot['date_generated'], '%B %Y')
                        html_data['mugshot_title'] = gmDateTime.pydt_strftime(mugshot['date_generated'], '%B %Y')
                # - export patient demographics as GDT/XML/VCF/MCF
                pat.export_as_gdt(filename = os.path.join(sandbox_dir, 'patient.gdt'))
                pat.export_as_xml_linuxmednews(filename = os.path.join(sandbox_dir, 'patient.xml'))
                pat.export_as_vcard(filename = os.path.join(sandbox_dir, 'patient.vcf'))
                pat.export_as_mecard(filename = os.path.join(sandbox_dir, 'patient.mcf'))
                # - create CD.INF
                self._create_cd_inf(pat, sandbox_dir)
                # - export items
                docs_list = []
                for item in items:
                        # if it is a dicomdir - put it into the root of the target media
                        if item.is_DICOM_directory:
                                _log.debug('exporting DICOMDIR DIRENTRY')
                                # save into base dir
                                item_fname = item.save_to_file(directory = sandbox_dir)
                                # do not include into ./documents/ listing
                                continue
                        if item.is_valid_DIRENTRY:
                                _log.debug('exporting DIRENTRY')
                                # if there are files in the root dir: put it into a
                                # subdir of ./documents/ where subdir is the leaf
                                # of the item .filename
                                if item.has_files_in_root:
                                        tag, node, local_fs_path = item['filename'].split('::', 2)
                                        subdir = local_fs_path.rstrip('/').split('/')[-1]
                                        subdir = os.path.join(doc_dir, subdir)
                                        gmTools.mkdir(subdir)
                                        item_fname = item.save_to_file(directory = subdir)
                                # if it is subdirs only - put it into documents/ directly
                                else:
                                        item_fname = item.save_to_file(directory = doc_dir)
                                # include into ./documents/ listing
                                fname = os.path.split(item_fname)[1]
                                docs_list.append([fname, gmTools.html_escape_string(item['description'])])
                                continue
                        if item.is_DIRENTRY:
                                # DIRENTRY but not valid: skip
                                continue
                        # normal entry, doc obj links or data item
                        item_fname = item.save_to_file(directory = doc_dir)
                        fname = os.path.split(item_fname)[1]
                        # collect items so we can later correlate items and descriptions
                        # actually we should link to encrypted items, and to unencrypted ones
                        docs_list.append([fname, gmTools.html_escape_string(item['description'])])
                # - link to DICOMDIR if exists
                if 'DICOMDIR' in os.listdir(sandbox_dir):       # in root path
                        has_dicomdir = True
                        html_data['browse_dicomdir'] = '<li><a href="./DICOMDIR">%s</a></li>' % _('show DICOMDIR file')
                else:
                        has_dicomdir = False
                # - include DWV link if there is a DICOMDIR
                if has_dicomdir:
                        # do not clone into media base to avoid encryption,
                        # DWV will be cloned to there later on
                        dwv_sandbox_dir = self._clone_dwv()
                        if dwv_sandbox_dir is not None:
                                html_data['run_dicom_viewer'] = '<li><a href="./dwv/viewers/mobile-local/index.html">%s</a></li>' % _('run Radiology Images (DICOM) Viewer')
                # - create frontpage.html
                frontpage_fname = self._create_frontpage_html(pat, prax, sandbox_dir, html_data, docs_list)
                # - start.html (just a convenience copy of frontpage.html)
                #   (later overwritten, if encryption is requested)
                start_fname = os.path.join(sandbox_dir, 'start.html')
                try:
                        shutil.copy2(frontpage_fname, start_fname)
                except Exception:
                        _log.exception('cannot copy %s to %s', frontpage_fname, start_fname)
                # - index.html (just a convenience copy of frontpage.html)
                #   (later overwritten if encryption is requested)
                index_fname = os.path.join(sandbox_dir, 'index.html')
                try:
                        shutil.copy2(frontpage_fname, index_fname)
                except Exception:
                        _log.exception('cannot copy %s to %s', frontpage_fname, index_fname)

                # 2) encrypt content of sandbox
                if passphrase is not None:
                        encrypted = gmCrypto.encrypt_directory_content (
                                directory = sandbox_dir,
                                receiver_key_ids = None,
                                passphrase = passphrase,
                                comment = None,
                                verbose = _cfg.get(option = 'debug'),
                                remove_unencrypted = True,
                                master_passphrase = master_passphrase,
                                store_passphrase_cb = store_passphrase_of_file_callback
                        )
                        if not encrypted:
                                _log.error('cannot encrypt data in sandbox dir')
                                return False

                # 3) add never-to-be-encrypted data
                # - AUTORUN.INF
                # - README
                if passphrase:
                        self._create_autorun_inf(None, sandbox_dir)
                        self._create_readme(None, sandbox_dir)
                else:
                        self._create_autorun_inf(pat, sandbox_dir)
                        self._create_readme(pat, sandbox_dir)
                # - praxis VCF/MCF
                shutil.move(prax.vcf, os.path.join(sandbox_dir, 'praxis.vcf'))
                prax.export_as_mecard(filename = os.path.join(sandbox_dir, u'praxis.mcf'))
                # - include DWV code
                if has_dicomdir:
                        self._clone_dwv(target_dir = sandbox_dir)
                # - index.html as boilerplate for decryption
                if passphrase:
                        index_fname = self._create_index_html(prax, sandbox_dir, html_data)

                # 4) move sandbox to target dir
                target_dir = gmTools.copy_tree_content(sandbox_dir, target_base_dir)
                if target_dir is None:
                        _log.error('cannot fill target base dir')
                        return False

                return target_dir

        #--------------------------------------------------------
        def export_as_zip(self, base_dir:str=None, items:list=None, passphrase:str=None, master_passphrase:str=None) -> str:
                _log.debug('target dir: %s', base_dir)
                export_dir = self.export(base_dir = base_dir, items = items)
                if export_dir is None:
                        _log.debug('cannot export items')
                        return None

                if not passphrase:
                        zip_file = gmCrypto.create_zip_archive_from_dir (
                                export_dir,
                                comment = _('GNUmed Patient Media'),
                                overwrite = True,
                                verbose = _cfg.get(option = 'debug')
                        )
                        if not zip_file:
                                _log.debug('cannot create zip archive')
                        return zip_file

                zip_file = gmCrypto.create_encrypted_zip_archive_from_dir (
                        export_dir,
                        comment = _('GNUmed Patient Media'),
                        overwrite = True,
                        passphrase = passphrase,
                        verbose = _cfg.get(option = 'debug')
                )
                if zip_file:
                        store_passphrase_of_file(filename = zip_file, passphrase = passphrase, master_passphrase = master_passphrase)
                else:
                        _log.debug('cannot create zip archive')
                return zip_file

        #--------------------------------------------------------
        def _create_index_html(self, praxis, directory, data):
                # header part
                _HTML_data = {
                        'html_title_header': _('Patient data'),
                        'title': _('Patient data excerpt'),
                        'docs_title': _('Documents'),
                        'browse_root': _('browse storage medium'),
                        'browse_docs': _('browse documents area'),
                        'doc_subdir': DOCUMENTS_SUBDIR,
                        'date' : gmTools.html_escape_string(gmDateTime.pydt_strftime(gmDateTime.pydt_now_here(), format = '%Y %B %d'))
                }
                frontpage_fname_enc = 'frontpage.html.asc'
                if os.path.isfile(os.path.join(directory, frontpage_fname_enc)):
                        _HTML_data['frontpage_fname'] = frontpage_fname_enc
                frontpage_fname_enc = 'frontpage.html.7z'
                if os.path.isfile(os.path.join(directory, frontpage_fname_enc)):
                        _HTML_data['frontpage_fname'] = frontpage_fname_enc
                # footer part
                lines = []
                adr = praxis.branch.org_unit.address
                if adr is not None:
                        lines.extend(adr.format())
                for comm in praxis.branch.org_unit.comm_channels:
                        if comm['is_confidential'] is True:
                                continue
                        lines.append('%s: %s' % (
                                comm['l10n_comm_type'],
                                comm['url']
                        ))
                adr = ''
                if len(lines) > 0:
                        adr = gmTools.html_escape_string('\n'.join(lines), replace_eol = True, keep_visual_eol = True)
                _HTML_data['branch'] = gmTools.html_escape_string(praxis['branch'])
                _HTML_data['praxis'] = gmTools.html_escape_string(praxis['praxis'])
                _HTML_data['gm_ver'] = gmTools.html_escape_string(gmTools.coalesce(_cfg.get(option = 'client_version'), 'git HEAD'))
                _HTML_data['adr'] = adr
                # create file
                index_fname = os.path.join(directory, 'index.html')
                index_file = open(index_fname, mode = 'wt', encoding = 'utf8')
                index_file.write(_INDEX_HTML_CONTENT % _HTML_data)
                index_file.close()
                return index_fname

        #--------------------------------------------------------
        def _create_frontpage_html(self, patient, praxis, directory, data, docs_list):
                # <li><a href="documents/filename-1.ext">document 1 description</a></li>
                _HTML_LIST_ITEM = '     <li><a href="%s">%s</a></li>'
                # header part
                _HTML_data = {
                        'html_title_header': _('Patient data for'),
                        'html_title_patient': gmTools.html_escape_string(patient.get_description_gender(with_nickname = False) + ', ' + _('born') + ' ' + patient.get_formatted_dob('%Y %B %d')),
                        'title': _('Patient data excerpt'),
                        'pat_name': gmTools.html_escape_string(patient.get_description_gender(with_nickname = False)),
                        'pat_dob': gmTools.html_escape_string(_('born') + ' ' + patient.get_formatted_dob('%Y %B %d')),
                        'mugshot_url': 'documents/no-such-file.png',
                        'mugshot_alt': _('no patient photograph available'),
                        'mugshot_title': '',
                        'docs_title': _('Documents'),
                        'browse_root': _('browse storage medium'),
                        'browse_docs': _('browse documents area'),
                        'doc_subdir': DOCUMENTS_SUBDIR,
                        'browse_dicomdir': '',
                        'run_dicom_viewer': '',
                        'date' : gmTools.html_escape_string(gmDateTime.pydt_strftime(gmDateTime.pydt_now_here(), format = '%Y %B %d'))
                }
                for key in data:
                        _HTML_data[key] = data[key]

                # documents part
                _HTML_docs_list = []
                for doc in docs_list:
                        subdir = os.path.join(directory, DOCUMENTS_SUBDIR, doc[0])
                        if os.path.isdir(subdir):
                                _HTML_docs_list.append(_HTML_LIST_ITEM % (os.path.join(DOCUMENTS_SUBDIR, doc[0]), _('DIRECTORY: %s/%s/') % (DOCUMENTS_SUBDIR, doc[0])))
                                _HTML_docs_list.append('        <ul>')
                                for fname in os.listdir(subdir):
                                        tmp = os.path.join(subdir, fname)
                                        if os.path.isdir(tmp):
                                                _HTML_docs_list.append('                <li><a href="%s">%s</a></li>' % (os.path.join(DOCUMENTS_SUBDIR, doc[0], fname), _('DIRECTORY: %s/%s/%s/') % (DOCUMENTS_SUBDIR, doc[0], fname)))
                                        else:
                                                _HTML_docs_list.append('                <li><a href="%s">%s</a></li>' % (os.path.join(DOCUMENTS_SUBDIR, doc[0], fname), fname))
                                _HTML_docs_list.append('        </ul>')
                        else:
                                _HTML_docs_list.append(_HTML_LIST_ITEM % (os.path.join(DOCUMENTS_SUBDIR, doc[0]), doc[1]))
                _HTML_data['docs_list'] = u'\n  '.join(_HTML_docs_list)

                # footer part
                lines = []
                adr = praxis.branch.org_unit.address
                if adr is not None:
                        lines.extend(adr.format())
                for comm in praxis.branch.org_unit.comm_channels:
                        if comm['is_confidential'] is True:
                                continue
                        lines.append('%s: %s' % (
                                comm['l10n_comm_type'],
                                comm['url']
                        ))
                adr = ''
                if len(lines) > 0:
                        adr = gmTools.html_escape_string('\n'.join(lines), replace_eol = True, keep_visual_eol = True)
                _HTML_data['branch'] = gmTools.html_escape_string(praxis['branch'])
                _HTML_data['praxis'] = gmTools.html_escape_string(praxis['praxis'])
                _HTML_data['gm_ver'] = gmTools.html_escape_string(gmTools.coalesce(_cfg.get(option = 'client_version'), 'git HEAD'))
                _HTML_data['adr'] = adr
                # create file
                frontpage_fname = os.path.join(directory, 'frontpage.html')
                frontpage_file = open(frontpage_fname, mode = 'wt', encoding = 'utf8')
                frontpage_file.write(_FRONTPAGE_HTML_CONTENT % _HTML_data)
                frontpage_file.close()
                return frontpage_fname

        #--------------------------------------------------------
        def _clone_dwv(self, target_dir=None):
                _log.debug('cloning dwv')
                # find DWV
                dwv_src_dir = os.path.join(gmTools.gmPaths().local_base_dir, 'resources', 'dwv4export')
                if not os.path.isdir(dwv_src_dir):
                        _log.debug('[%s] not found', dwv_src_dir)
                        dwv_src_dir = os.path.join(gmTools.gmPaths().system_app_data_dir, 'resources', 'dwv4export')
                if not os.path.isdir(dwv_src_dir):
                        _log.debug('[%s] not found', dwv_src_dir)
                        return None
                # clone it
                if target_dir is None:
                        target_dir = gmTools.mk_sandbox_dir()
                dwv_target_dir = os.path.join(target_dir, 'dwv')
                gmTools.rmdir(dwv_target_dir)
                try:
                        shutil.copytree(dwv_src_dir, dwv_target_dir)
                except (shutil.Error, OSError):
                        _log.exception('cannot include DWV, skipping')
                        return None

                return dwv_target_dir

        #--------------------------------------------------------
        def _create_readme(self, patient, directory):
                _README_CONTENT = (
                        'This is a patient data excerpt created by the GNUmed Electronic Medical Record.\n'
                        '\n'
                        'Patient: %s\n'
                        '\n'
                        'Please display <frontpage.html> to browse patient data.\n'
                        '\n'
                        'Individual documents are stored in the subdirectory\n'
                        '\n'
                        '       documents/\n'
                        '\n'
                        '\n'
                        'Data may need to be decrypted with either GNU Privacy\n'
                        'Guard or 7zip/WinZip.\n'
                        '\n'
                        '.asc:\n'
                        '       https://gnupg.org\n'
                        '\n'
                        '.7z:\n'
                        '       https://www.7-zip.org\n'
                        '       https://www.winzip.com\n'
                        '\n'
                        'To obtain any needed keys you will have to get in touch with\n'
                        'the creator or the owner of this patient media excerpt.\n'
                )
                readme_fname = os.path.join(directory, 'README')
                readme_file = open(readme_fname, mode = 'wt', encoding = 'utf8')
                if patient is None:
                        pat_str = _('<protected>')
                else:
                        pat_str = patient.get_description_gender(with_nickname = False) + ', ' + _('born') + ' ' + patient.get_formatted_dob('%Y %B %d')
                readme_file.write(_README_CONTENT % pat_str)
                readme_file.close()
                return readme_fname

        #--------------------------------------------------------
        def _create_cd_inf(self, patient, directory):
                _CD_INF_CONTENT = (
                        '[Patient Info]\r\n'                                    # needs \r\n for Windows
                        'PatientName=%s, %s\r\n'
                        'Gender=%s\r\n'
                        'BirthDate=%s\r\n'
                        'CreationDate=%s\r\n'
                        'PID=%s\r\n'
                        'EMR=GNUmed\r\n'
                        'Version=%s\r\n'
                        '#StudyDate=\r\n'
                        '#VNRInfo=<body part>\r\n'
                        '\r\n'
                        '# name format: lastnames, firstnames\r\n'
                        '# date format: YYYY-MM-DD (ISO 8601)\r\n'
                        '# gender format: %s\r\n'
                )
                fname = os.path.join(directory, 'CD.INF')
                cd_inf = open(fname, mode = 'wt', encoding = 'utf8')
                cd_inf.write(_CD_INF_CONTENT % (
                        patient['lastnames'],
                        patient['firstnames'],
                        gmTools.coalesce(patient['gender'], '?'),
                        patient.get_formatted_dob('%Y-%m-%d'),
                        gmDateTime.pydt_strftime(gmDateTime.pydt_now_here(), format = '%Y-%m-%d'),
                        patient.ID,
                        _cfg.get(option = 'client_version'),
                        ' / '.join([ '%s = %s (%s)' % (g['tag'], g['label'], g['l10n_label']) for g in patient.gender_list ])
                ))
                cd_inf.close()
                return fname

        #--------------------------------------------------------
        def _create_autorun_inf(self, patient, directory):
                _AUTORUN_INF_CONTENT = (                                                        # needs \r\n for Windows
                        '[AutoRun.Amd64]\r\n'                                   # 64 bit
                        'label=%(label)s\r\n'                                   # patient name/DOB
                        'shellexecute=index.html\r\n'
                        'action=%(action)s\r\n'                                 # % _('Browse patient data')
                        '%(icon)s\r\n'                                                  # "icon=gnumed.ico" or ""
                        'UseAutoPlay=1\r\n'
                        '\r\n'
                        '[AutoRun]\r\n'                                                 # 32 bit
                        'label=%(label)s\r\n'                                   # patient name/DOB
                        'shellexecute=index.html\r\n'
                        'action=%(action)s\r\n'                                 # % _('Browse patient data')
                        '%(icon)s\r\n'                                                  # "icon=gnumed.ico" or ""
                        'UseAutoPlay=1\r\n'
                        '\r\n'
                        '[Content]\r\n'
                        'PictureFiles=yes\r\n'
                        'VideoFiles=yes\r\n'
                        'MusicFiles=no\r\n'
                        '\r\n'
                        '[IgnoreContentPaths]\r\n'
                        '\documents\r\n'
                        '\r\n'
                        '[unused]\r\n'
                        'open=requires explicit executable\r\n'
                )
                autorun_dict = {
                        'label': self._compute_autorun_inf_label(patient),
                        'action': _('Browse patient data'),
                        'icon': ''
                }
                media_icon_kwd = '$$gnumed_patient_media_export_icon'
                media_icon_kwd_exp = gmKeywordExpansion.get_expansion (
                        keyword = media_icon_kwd,
                        textual_only = False,
                        binary_only = True
                )
                icon_tmp_fname = media_icon_kwd_exp.save_to_file (
                        target_mime = 'image/x-icon',
                        target_extension = '.ico',
                        ignore_conversion_problems = True
                )
                if icon_tmp_fname is None:
                        _log.error('cannot retrieve <%s>', media_icon_kwd)
                else:
                        media_icon_fname = os.path.join(directory, 'gnumed.ico')
                        try:
                                shutil.copy2(icon_tmp_fname, media_icon_fname)
                                autorun_dict['icon'] = 'icon=gnumed.ico'
                        except Exception:
                                _log.exception('cannot move %s to %s', icon_tmp_fname, media_icon_fname)
                autorun_fname = os.path.join(directory, 'AUTORUN.INF')
                autorun_file = open(autorun_fname, mode = 'wt', encoding = 'cp1252', errors = 'replace')
                autorun_file.write(_AUTORUN_INF_CONTENT % autorun_dict)
                autorun_file.close()
                return autorun_fname

        #--------------------------------------------------------
        def _compute_autorun_inf_label(self, patient):
                if patient is None:
                        # if no patient provided - assume the target media
                        # is to be encrypted and thusly do not expose patient data,
                        # AUTORUN.INF itself will not be encrypted because
                        # Windows must be able to parse it
                        return _('GNUmed patient data excerpt')[:32]

                LABEL_MAX_LEN = 32
                dob = patient.get_formatted_dob(format = ' %Y%m%d', none_string = '', honor_estimation = False)
                if dob == '':
                        gender_template = ' (%s)'
                else:
                        gender_template = ' %s'
                gender = gmTools.coalesce(patient['gender'], '', gender_template)
                name_max_len = LABEL_MAX_LEN - len(gender) - len(dob)                   # they already include appropriate padding
                name = patient.active_name
                last = name['lastnames'].strip()
                first = name['firstnames'].strip()
                len_last = len(last)
                len_first = len(first)
                while (len_last + len_first + 1) > name_max_len:
                        if len_first > 6:
                                len_first -= 1
                                if first[len_first - 1] == ' ':
                                        len_first -= 1
                                continue
                        len_last -= 1
                        if last[len_last - 1] == ' ':
                                len_last -= 1
                last = last[:len_last].strip().upper()
                first = first[:len_first].strip()
                # max 32 chars, supposedly ASCII, but CP1252 likely works pretty well
                label = (('%s %s%s%s' % (last, first, dob,      gender)).strip())[:32]
                return label

        #--------------------------------------------------------
        # properties
        #--------------------------------------------------------
        def get_items(self, designation=None, order_by='pk_identity, list_position, designation, description'):
                return get_export_items(order_by = order_by, pk_identity = self.__pk_identity, designation = designation)

        items = property(get_items)

        #--------------------------------------------------------
        def get_printouts(self, order_by='list_position, designation, description'):
                return get_print_jobs(order_by = order_by, pk_identity = self.__pk_identity)

        printouts = property(get_printouts)

Instance variables

var items
Expand source code
def get_items(self, designation=None, order_by='pk_identity, list_position, designation, description'):
        return get_export_items(order_by = order_by, pk_identity = self.__pk_identity, designation = designation)
var printouts
Expand source code
def get_printouts(self, order_by='list_position, designation, description'):
        return get_print_jobs(order_by = order_by, pk_identity = self.__pk_identity)

Methods

def add_documents(self, documents: list = None) ‑> None
Expand source code
def add_documents(self, documents:list=None) -> None:
        for doc in documents:
                doc_tag = _('%s (%s)%s') % (
                        doc['l10n_type'],
                        gmDateTime.pydt_strftime(doc['clin_when'], '%Y %b %d'),
                        gmTools.coalesce(doc['comment'], '', ' "%s"')
                )
                for obj in doc.parts:
                        if self.document_part_item_exists(pk_part = obj['pk_obj']):
                                continue
                        f_ext = ''
                        if obj['filename'] is not None:
                                f_ext = os.path.splitext(obj['filename'])[1].strip('.').strip()
                        if f_ext != '':
                                f_ext = ' .' + f_ext.upper()
                        obj_tag = _('part %s (%s%s)%s') % (
                                obj['seq_idx'],
                                gmTools.size2str(obj['size']),
                                f_ext,
                                gmTools.coalesce(obj['obj_comment'], '', ' "%s"')
                        )
                        create_export_item (
                                description = '%s - %s' % (doc_tag, obj_tag),
                                pk_doc_obj = obj['pk_obj']
                        )
def add_file(self, filename=None, hint=None)
Expand source code
def add_file(self, filename=None, hint=None):
        try:
                open(filename).close()
        except Exception:
                _log.exception('cannot open file <%s>', filename)
                return None

        file_md5 = gmTools.file2md5(filename = filename, return_hex = True)
        existing_item = self.md5_exists(md5 = file_md5, include_document_parts = False)
        if existing_item is not None:
                _log.debug('md5 match (%s): %s already in export area', file_md5, filename)
                return existing_item

        path, basename = os.path.split(filename)
        item = create_export_item (
                description = '%s: %s (%s/)' % (
                        gmTools.coalesce(hint, _('file'), '%s'),
                        basename,
                        path
                ),
                pk_identity = self.__pk_identity,
                filename = filename
        )

        if item.update_data_from_file(filename = filename):
                return item

        # failed to insert data, hence remove export item entry
        delete_export_item(pk_export_item = item['pk_export_item'])
        return None
def add_files(self, filenames=None, hint=None)
Expand source code
def add_files(self, filenames=None, hint=None):
        all_ok = True
        for fname in filenames:
                all_ok = all_ok and (self.add_file(filename = fname, hint = hint) is not None)

        return all_ok
def add_form(self, form=None, designation=None)
Expand source code
def add_form(self, form=None, designation=None):

        if len(form.final_output_filenames) == 0:
                return True

        items = []
        for fname in form.final_output_filenames:
                item = self.add_file(filename = fname)
                if item is None:
                        for prev_item in items:
                                delete_export_item(pk_export_item = prev_item['pk_export_item'])
                        return False
                items.append(item)
                item['description'] = _('form: %s %s (%s)') % (form.template['name_long'], form.template['external_version'], fname)
                item['designation'] = designation
                item.save()

        return True
def add_forms(self, forms=None, designation=None)
Expand source code
def add_forms(self, forms=None, designation=None):
        all_ok = True
        for form in forms:
                all_ok = all_ok and self.add_form(form = form, designation = designation)

        return all_ok
def add_path(self, path, comment=None)

Add a DIR entry to the export area.

This sort of entry points to a certain directory on a certain machine. The content of the the directory will be included in exports, the directory itself will not. For that, use a disposable top-level directory into which you put the directory to include as a subdirectory.

Expand source code
def add_path(self, path, comment=None):
        """Add a DIR entry to the export area.

        This sort of entry points to a certain directory on a
        certain machine. The content of the the directory
        will be included in exports, the directory *itself*
        will not. For *that*, use a disposable top-level
        directory into which you put the directory to include
        as a subdirectory.
        """
        assert (os.path.isdir(path)), '<path> must exist: %s' % path

        path_item_data = 'DIR::%s::%s/' % (platform.node(), path.rstrip('/'))
        _log.debug('attempting to add path item [%s]', path_item_data)
        item = self.path_item_exists(path_item_data)
        if item is not None:
                _log.debug('[%s] already in export area', path)
                return item

        if comment is None:
                comment = _('path [%s/] on computer "%s"') % (
                        path.rstrip('/'),
                        platform.node()
                )
        else:
                comment += _(' (on "%s")') % platform.node()

        item = create_export_item (
                description = comment,
                pk_identity = self.__pk_identity,
                filename = path_item_data
        )
        try:
                README = open(os.path.join(path, DIRENTRY_README_NAME), mode = 'wt', encoding = 'utf8')
                README.write('GNUmed DIRENTRY information\n')
                README.write('created: %s\n' % gmDateTime.pydt_now_here())
                README.write('machine: %s\n' % platform.node())
                README.write('path: %s\n' % path)
                README.close()
        except OSError:
                _log.exception('READONLY DIRENTRY [%s]', path)

        return item
def document_part_item_exists(self, pk_part=None)
Expand source code
def document_part_item_exists(self, pk_part=None):
        cmd = "SELECT EXISTS (SELECT 1 FROM clin.export_item WHERE fk_doc_obj = %(pk_obj)s)"
        args = {'pk_obj': pk_part}
        rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])
        return rows[0][0]
def dump_items_to_disk(self, base_dir: str = None, items: list = None, passphrase: str = None, convert2pdf: bool = False, master_passphrase: str = None) ‑> str

Dump export area items to disk.

Args

base_dir
directory into which to dump the items, defaults to a new sandbox directory
items
which items to dump, defaults to all items
passphrase
used to symmetrically encrypt dumped files, if set
convert2pdf
attempt to convert items into PDF before dumping them
master_passphrase
encrypt passphrase with this password for safekeeping in GNUmed if no pubkeys available

Returns

The base_dir into which items were dumped, or None.

Expand source code
def dump_items_to_disk(self, base_dir:str=None, items:list=None, passphrase:str=None, convert2pdf:bool=False, master_passphrase:str=None) -> str:
        """Dump export area items to disk.

        Args:
                base_dir: directory into which to dump the items, defaults to a new sandbox directory
                items: which items to dump, defaults to all items
                passphrase: used to symmetrically encrypt dumped files, if set
                convert2pdf: attempt to convert items into PDF before dumping them
                master_passphrase: encrypt passphrase with this password for safekeeping in GNUmed if no pubkeys available

        Returns:
                The base_dir into which items were dumped, or None.
        """
        if items is None:
                items = self.items
        if len(items) == 0:
                return None

        if base_dir is None:
                base_dir = gmTools.mk_sandbox_dir()
        else:
                gmTools.mkdir(base_dir)
        _log.debug('dumping export items to: %s', base_dir)
        for item in items:
                saved_fname = item.save_to_file(directory = base_dir, passphrase = passphrase, convert2pdf = convert2pdf)
                if saved_fname is None:
                        return None

                if passphrase:
                        store_passphrase_of_file(filename = saved_fname, passphrase = passphrase, master_passphrase = master_passphrase)

        return base_dir
def dump_items_to_disk_as_zip(self, base_dir: str = None, items: list = None, passphrase: str = None, master_passphrase: str = None) ‑> str

Dump items to disk into a zip archive.

    Calls dump_items_to_disk().

Args

base_dir
directory into which to dump the items, defaults to a new sandbox directory
items
which items to dump, defaults to all items
passphrase
used to symmetrically encrypt dumped files, if set
master_passphrase
encrypt passphrase with this password for safekeeping in GNUmed if no pubkeys available

Returns

Zip archive name, or None.

Expand source code
def dump_items_to_disk_as_zip(self, base_dir:str=None, items:list=None, passphrase:str=None, master_passphrase:str=None) -> str:
        """Dump items to disk into a zip archive.

                Calls dump_items_to_disk().

        Args:
                base_dir: directory into which to dump the items, defaults to a new sandbox directory
                items: which items to dump, defaults to all items
                passphrase: used to symmetrically encrypt dumped files, if set
                master_passphrase: encrypt passphrase with this password for safekeeping in GNUmed if no pubkeys available

        Returns:
                Zip archive name, or None.
        """
        _log.debug('target dir: %s', base_dir)
        dump_dir = self.dump_items_to_disk(base_dir = base_dir, items = items)
        if dump_dir is None:
                _log.error('cannot dump export area items')
                return None

        if passphrase:
                zip_file = gmCrypto.create_encrypted_zip_archive_from_dir (
                        source_dir = dump_dir,
                        comment = _('GNUmed Patient Media'),
                        overwrite = True,
                        passphrase = passphrase,
                        verbose = _cfg.get(option = 'debug')
                )
                if zip_file:
                        store_passphrase_of_file(filename = zip_file, passphrase = passphrase, master_passphrase = master_passphrase)
                else:
                        _log.error('cannot zip+encrypt export area items dump')
                return zip_file

        zip_file = gmCrypto.create_zip_archive_from_dir (
                dump_dir,
                comment = _('GNUmed Patient Media'),
                overwrite = True,
                verbose = _cfg.get(option = 'debug')
        )
        if not zip_file:
                _log.error('cannot zip export area items dump')
        return zip_file
def export(self, base_dir: str = None, items: list = None, passphrase: str = None, master_passphrase: str = None) ‑> bool | str

Export items as structured patient media.

Args

base_dir
directory into which to store the export, defaults to a new sandbox directory based on patient name
items
which items to dump, defaults to all items
passphrase
used to symmetrically encrypt dumped files, if set
master_passphrase
password to encrypt passphrase with for safekeeping, if no public keys available for that

Returns

Base_dir name, else None or False on failure.

Expand source code
def export(self, base_dir:str=None, items:list=None, passphrase:str=None, master_passphrase:str=None) -> bool|str:
        """Export items as structured patient media.

        Args:
                base_dir: directory into which to store the export, defaults to a new sandbox directory based on patient name
                items: which items to dump, defaults to all items
                passphrase: used to symmetrically encrypt dumped files, if set
                master_passphrase: password to encrypt passphrase with for safekeeping, if no public keys available for that

        Returns:
                Base_dir name, else None or False on failure.
        """
        if items is None:
                items = self.items
        if len(items) == 0:
                return None

        from Gnumed.business.gmPerson import cPatient
        pat = cPatient(aPK_obj = self.__pk_identity)
        target_base_dir = base_dir
        if target_base_dir is None:
                target_sandbox_dir = gmTools.mk_sandbox_dir()
                target_base_dir = os.path.join(target_sandbox_dir, pat.subdir_name)
        gmTools.mkdir(target_base_dir)
        _log.debug('patient media base dir: %s', target_base_dir)
        if not gmTools.dir_is_empty(target_base_dir):
                _log.error('patient media base dir is not empty')
                return False

        from Gnumed.business.gmPraxis import gmCurrentPraxisBranch
        prax = gmCurrentPraxisBranch()

        html_data = {}

        # 1) assemble everything into a sandbox
        # - setup sandbox
        sandbox_dir = gmTools.mk_sandbox_dir()
        _log.debug('sandbox dir: %s', sandbox_dir)
        doc_dir = os.path.join(sandbox_dir, DOCUMENTS_SUBDIR)
        gmTools.mkdir(doc_dir)
        # - export mugshot
        mugshot = pat.document_folder.latest_mugshot
        if mugshot is not None:
                mugshot_fname = mugshot.save_to_file(directory = doc_dir)
                fname = os.path.split(mugshot_fname)[1]
                html_data['mugshot_url'] = os.path.join(DOCUMENTS_SUBDIR, fname)
                html_data['mugshot_alt'] =_('patient photograph from %s') % gmDateTime.pydt_strftime(mugshot['date_generated'], '%B %Y')
                html_data['mugshot_title'] = gmDateTime.pydt_strftime(mugshot['date_generated'], '%B %Y')
        # - export patient demographics as GDT/XML/VCF/MCF
        pat.export_as_gdt(filename = os.path.join(sandbox_dir, 'patient.gdt'))
        pat.export_as_xml_linuxmednews(filename = os.path.join(sandbox_dir, 'patient.xml'))
        pat.export_as_vcard(filename = os.path.join(sandbox_dir, 'patient.vcf'))
        pat.export_as_mecard(filename = os.path.join(sandbox_dir, 'patient.mcf'))
        # - create CD.INF
        self._create_cd_inf(pat, sandbox_dir)
        # - export items
        docs_list = []
        for item in items:
                # if it is a dicomdir - put it into the root of the target media
                if item.is_DICOM_directory:
                        _log.debug('exporting DICOMDIR DIRENTRY')
                        # save into base dir
                        item_fname = item.save_to_file(directory = sandbox_dir)
                        # do not include into ./documents/ listing
                        continue
                if item.is_valid_DIRENTRY:
                        _log.debug('exporting DIRENTRY')
                        # if there are files in the root dir: put it into a
                        # subdir of ./documents/ where subdir is the leaf
                        # of the item .filename
                        if item.has_files_in_root:
                                tag, node, local_fs_path = item['filename'].split('::', 2)
                                subdir = local_fs_path.rstrip('/').split('/')[-1]
                                subdir = os.path.join(doc_dir, subdir)
                                gmTools.mkdir(subdir)
                                item_fname = item.save_to_file(directory = subdir)
                        # if it is subdirs only - put it into documents/ directly
                        else:
                                item_fname = item.save_to_file(directory = doc_dir)
                        # include into ./documents/ listing
                        fname = os.path.split(item_fname)[1]
                        docs_list.append([fname, gmTools.html_escape_string(item['description'])])
                        continue
                if item.is_DIRENTRY:
                        # DIRENTRY but not valid: skip
                        continue
                # normal entry, doc obj links or data item
                item_fname = item.save_to_file(directory = doc_dir)
                fname = os.path.split(item_fname)[1]
                # collect items so we can later correlate items and descriptions
                # actually we should link to encrypted items, and to unencrypted ones
                docs_list.append([fname, gmTools.html_escape_string(item['description'])])
        # - link to DICOMDIR if exists
        if 'DICOMDIR' in os.listdir(sandbox_dir):       # in root path
                has_dicomdir = True
                html_data['browse_dicomdir'] = '<li><a href="./DICOMDIR">%s</a></li>' % _('show DICOMDIR file')
        else:
                has_dicomdir = False
        # - include DWV link if there is a DICOMDIR
        if has_dicomdir:
                # do not clone into media base to avoid encryption,
                # DWV will be cloned to there later on
                dwv_sandbox_dir = self._clone_dwv()
                if dwv_sandbox_dir is not None:
                        html_data['run_dicom_viewer'] = '<li><a href="./dwv/viewers/mobile-local/index.html">%s</a></li>' % _('run Radiology Images (DICOM) Viewer')
        # - create frontpage.html
        frontpage_fname = self._create_frontpage_html(pat, prax, sandbox_dir, html_data, docs_list)
        # - start.html (just a convenience copy of frontpage.html)
        #   (later overwritten, if encryption is requested)
        start_fname = os.path.join(sandbox_dir, 'start.html')
        try:
                shutil.copy2(frontpage_fname, start_fname)
        except Exception:
                _log.exception('cannot copy %s to %s', frontpage_fname, start_fname)
        # - index.html (just a convenience copy of frontpage.html)
        #   (later overwritten if encryption is requested)
        index_fname = os.path.join(sandbox_dir, 'index.html')
        try:
                shutil.copy2(frontpage_fname, index_fname)
        except Exception:
                _log.exception('cannot copy %s to %s', frontpage_fname, index_fname)

        # 2) encrypt content of sandbox
        if passphrase is not None:
                encrypted = gmCrypto.encrypt_directory_content (
                        directory = sandbox_dir,
                        receiver_key_ids = None,
                        passphrase = passphrase,
                        comment = None,
                        verbose = _cfg.get(option = 'debug'),
                        remove_unencrypted = True,
                        master_passphrase = master_passphrase,
                        store_passphrase_cb = store_passphrase_of_file_callback
                )
                if not encrypted:
                        _log.error('cannot encrypt data in sandbox dir')
                        return False

        # 3) add never-to-be-encrypted data
        # - AUTORUN.INF
        # - README
        if passphrase:
                self._create_autorun_inf(None, sandbox_dir)
                self._create_readme(None, sandbox_dir)
        else:
                self._create_autorun_inf(pat, sandbox_dir)
                self._create_readme(pat, sandbox_dir)
        # - praxis VCF/MCF
        shutil.move(prax.vcf, os.path.join(sandbox_dir, 'praxis.vcf'))
        prax.export_as_mecard(filename = os.path.join(sandbox_dir, u'praxis.mcf'))
        # - include DWV code
        if has_dicomdir:
                self._clone_dwv(target_dir = sandbox_dir)
        # - index.html as boilerplate for decryption
        if passphrase:
                index_fname = self._create_index_html(prax, sandbox_dir, html_data)

        # 4) move sandbox to target dir
        target_dir = gmTools.copy_tree_content(sandbox_dir, target_base_dir)
        if target_dir is None:
                _log.error('cannot fill target base dir')
                return False

        return target_dir
def export_as_zip(self, base_dir: str = None, items: list = None, passphrase: str = None, master_passphrase: str = None) ‑> str
Expand source code
def export_as_zip(self, base_dir:str=None, items:list=None, passphrase:str=None, master_passphrase:str=None) -> str:
        _log.debug('target dir: %s', base_dir)
        export_dir = self.export(base_dir = base_dir, items = items)
        if export_dir is None:
                _log.debug('cannot export items')
                return None

        if not passphrase:
                zip_file = gmCrypto.create_zip_archive_from_dir (
                        export_dir,
                        comment = _('GNUmed Patient Media'),
                        overwrite = True,
                        verbose = _cfg.get(option = 'debug')
                )
                if not zip_file:
                        _log.debug('cannot create zip archive')
                return zip_file

        zip_file = gmCrypto.create_encrypted_zip_archive_from_dir (
                export_dir,
                comment = _('GNUmed Patient Media'),
                overwrite = True,
                passphrase = passphrase,
                verbose = _cfg.get(option = 'debug')
        )
        if zip_file:
                store_passphrase_of_file(filename = zip_file, passphrase = passphrase, master_passphrase = master_passphrase)
        else:
                _log.debug('cannot create zip archive')
        return zip_file
def get_items(self, designation=None, order_by='pk_identity, list_position, designation, description')
Expand source code
def get_items(self, designation=None, order_by='pk_identity, list_position, designation, description'):
        return get_export_items(order_by = order_by, pk_identity = self.__pk_identity, designation = designation)
def get_printouts(self, order_by='list_position, designation, description')
Expand source code
def get_printouts(self, order_by='list_position, designation, description'):
        return get_print_jobs(order_by = order_by, pk_identity = self.__pk_identity)
def md5_exists(self, md5=None, include_document_parts=False)
Expand source code
def md5_exists(self, md5=None, include_document_parts=False):
        where_parts = [
                'pk_identity = %(pat)s',
                'md5_sum = %(md5)s'
        ]
        args = {
                'pat': self.__pk_identity,
                'md5': md5
        }

        if not include_document_parts:
                where_parts.append('pk_doc_obj IS NULL')

        cmd = _SQL_get_export_items % ' AND '.join(where_parts)
        rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])

        if len(rows) == 0:
                return None

        r = rows[0]
        return cExportItem(row = {'data': r, 'pk_field': 'pk_export_item'})
def path_item_exists(self, path_item_data)
Expand source code
def path_item_exists(self, path_item_data):

        assert (path_item_data.startswith('DIR::')), 'invalid <path_item_data> [%s]' % path_item_data

        where_parts = [
                'pk_identity = %(pat)s',
                'filename = %(fname)s',
                'pk_doc_obj IS NULL'
        ]
        args = {
                'pat': self.__pk_identity,
                'fname': path_item_data
        }
        SQL = _SQL_get_export_items % ' AND '.join(where_parts)
        rows = gmPG2.run_ro_queries(queries = [{'cmd': SQL, 'args': args}])
        if len(rows) == 0:
                return None

        r = rows[0]
        return cExportItem(row = {'data': r, 'pk_field': 'pk_export_item'})
def remove_item(self, item)
Expand source code
def remove_item(self, item):
        if item.is_valid_DIRENTRY:
                gmTools.remove_file(os.path.join(item.DIRENTRY_path, DIRENTRY_README_NAME))
        return delete_export_item(pk_export_item = item['pk_export_item'])
class cExportItem (aPK_obj=None, row=None, link_obj=None)

Represents an item in the export area table

Call init from child classes like so:

    super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)

Args

aPK_obj
retrieve data from backend
  • an scalar value the ._cmd_fetch_payload WHERE condition must be a simple column: "… WHERE pk_col = %s"
  • a dictionary of values the ._cmd_fetch_payload WHERE condition must consume the dictionary and produce a unique row
row
must hold the fields
  • data: list of column values for the row selected by ._cmd_fetch_payload (as returned by cursor.fetchone() in the DB-API)
  • pk_field: the name of the primary key column OR
  • pk_obj: a dictionary suitable for being passed to cursor.execute and holding the primary key values, used for composite PKs
  • for example:
    row = {
            'data': rows[0],
            'pk_field': 'pk_XXX (the PK column name)',
            'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val}
    }
    
    rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])
    objects = [ cChildClass(row = {'data': r, 'pk_field': 'the PK column name'}) for r in rows ]
    
Expand source code
class cExportItem(gmBusinessDBObject.cBusinessDBObject):
        """Represents an item in the export area table"""

        _cmd_fetch_payload = _SQL_get_export_items % "pk_export_item = %s"
        _cmds_store_payload = [
                """UPDATE clin.export_item SET
                                fk_identity = %(pk_identity)s,
                                created_by = gm.nullify_empty_string(%(created_by)s),
                                created_when = %(created_when)s,
                                designation = gm.nullify_empty_string(%(designation)s),
                                description = gm.nullify_empty_string(%(description)s),
                                fk_doc_obj = %(pk_doc_obj)s,
                                data = CASE
                                        WHEN %(pk_doc_obj)s IS NULL THEN coalesce(data, 'to be replaced by real data')
                                        ELSE NULL
                                END,
                                filename = CASE
                                        WHEN %(pk_doc_obj)s IS NULL THEN gm.nullify_empty_string(%(filename)s)
                                        ELSE NULL
                                END
                        WHERE
                                pk = %(pk_export_item)s
                                        AND
                                xmin = %(xmin_export_item)s
                """,
                "SELECT clin.export_item_set_list_position(%(pk_export_item)s, %(list_position)s)",
                _SQL_get_export_items % 'pk_export_item = %(pk_export_item)s'
        ]
        _updatable_fields = [
                'pk_identity',
                'created_when',
                'designation',
                'description',
                'pk_doc_obj',
                'filename',
                'list_position'
        ]
        #--------------------------------------------------------
        def __init__(self, aPK_obj=None, row=None, link_obj=None):
                super(cExportItem, self).__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)
                # force auto-healing if need be
                if self._payload['pk_identity_raw_needs_update']:
                        _log.warning (
                                'auto-healing export item [%s] from identity [%s] to [%s] because of document part [%s] seems necessary',
                                self._payload['pk_export_item'],
                                self._payload['pk_identity_raw'],
                                self._payload['pk_identity'],
                                self._payload['pk_doc_obj']
                        )
                        if self._payload['pk_doc_obj'] is None:
                                _log.error('however, .fk_doc_obj is NULL, which should not happen, leaving things alone for manual inspection')
                                return
                        # only flag ourselves as modified, do not actually
                        # modify any values, better safe than sorry
                        self._is_modified = True
                        self.save()
                        self.refetch_payload(ignore_changes = False, link_obj = link_obj)

        #--------------------------------------------------------
#       def format(self):
#               return u'%s' % self
        #--------------------------------------------------------
        def update_data(self, data=None):
                assert (data is not None), '<data> must not be <None>'

                if self.is_DIRENTRY or self.is_document_part:
                        return False

                SQL = """
                        UPDATE clin.export_item SET
                                data = %(data)s::bytea,
                                fk_doc_obj = NULL
                        WHERE pk = %(pk)s"""
                args = {'pk': self.pk_obj, 'data': data}
                gmPG2.run_rw_queries(queries = [{'cmd': SQL, 'args': args}], return_data = False)
                # must update XMIN now ...
                self.refetch_payload()
                return True

        #--------------------------------------------------------
        def update_data_from_file(self, filename=None, convert_document_part=False):

                if self.is_DIRENTRY:
                        return False

                if self.is_document_part:
                        if not convert_document_part:
                                return False

                # sanity check
                if not (os.access(filename, os.R_OK) and os.path.isfile(filename)):
                        _log.error('[%s] is not a readable file' % filename)
                        return False

                cmd = """
                        UPDATE clin.export_item SET
                                data = %(data)s::bytea,
                                fk_doc_obj = NULL,
                                filename = gm.nullify_empty_string(%(fname)s)
                        WHERE pk = %(pk)s"""
                args = {'pk': self.pk_obj, 'fname': filename}
                if not gmPG2.file2bytea(query = cmd, filename = filename, args = args):
                        return False

                # must update XMIN now ...
                self.refetch_payload()
                return True

        #--------------------------------------------------------
        def save_to_file(self, aChunkSize:int=0, filename:str=None, directory:str=None, passphrase:str=None, convert2pdf:bool=False) -> str:
                """Save this entry to disk.

                Args:
                        filename: The filename to save into, unless this is a DIRENTRY.
                        directory: The directory to save the DIRENTRY _content into, if this is a DIRENTRY.
                        passphrase: Encrypt file(s) with this passphrase, if not _None_.
                        convert2pdf: Convert file(s) to PDF on the way out. Before encryption, that is.

                Returns:
                        Directory for DIRENTRIES, or filename, or None on failure.
                """
                if self.is_DIRENTRY and convert2pdf:
                        # cannot convert dir entries to PDF
                        return None

                # data linked from archive ?
                part_fname = self.__save_doc_obj (
                        filename = filename,
                        directory = directory,
                        passphrase = passphrase,
                        convert2pdf = convert2pdf
                )
                if part_fname is False:
                        return None

                if part_fname is not None:
                        return part_fname

                # valid DIRENTRY ?
                if self.is_valid_DIRENTRY:
                        target_dir = self.__save_direntry (
                                directory,
                                passphrase = passphrase
                        )
                        if target_dir is False:
                                return None

                        if target_dir is not None:
                                return target_dir

                # but still a DIRENTRY ?
                if self.is_DIRENTRY:
                        # yes, but apparently not valid (anymore), say,
                        # other machine or dir not found locally
                        return None

                # normal item with data in export area table
                return self.__save_normal_item (
                        filename = filename,
                        directory = directory,
                        passphrase = passphrase,
                        convert2pdf = convert2pdf
                )

        #--------------------------------------------------------
        def display_via_mime(self, chunksize=0, block=None):

                # document part
                if self._payload['pk_doc_obj'] is not None:
                        return self.document_part.display_via_mime(chunksize = chunksize, block = block)

                # DIR entry
                if self._payload['filename'].startswith('DIR::'):
                        # FIXME: error handling with malformed entries
                        tag, node, path = self._payload['filename'].split('::', 2)
                        if node != platform.node():
                                msg = _(
                                        'This item points to a directory on the computer named:\n'
                                        ' %s\n'
                                        'You are, however, currently using another computer:\n'
                                        ' %s\n'
                                        'Directory items can only be viewed/saved/exported\n'
                                        'on the computer they are pointing to.'
                                ) % (node, platform.node())
                                return False, msg
                        success, msg = gmMimeLib.call_viewer_on_file(path, block = block)
                        return success, msg

                # data -> save
                fname = self.save_to_file(aChunkSize = chunksize)
                if fname is None:
                        return False, ''

                success, msg = gmMimeLib.call_viewer_on_file(fname, block = block)
                if not success:
                        return False, msg

                return True, ''

        #--------------------------------------------------------
        def get_useful_filename(self, patient=None, directory=None):
                patient_part = ''
                if patient is not None:
                        patient_part = '-%s' % patient.subdir_name

                # preserve original filename extension if available
                suffix = '.dat'
                if self._payload['filename'] is not None:
                        tmp, suffix = os.path.splitext (
                                gmTools.fname_sanitize(self._payload['filename']).casefold()
                        )
                        if suffix == '':
                                suffix = '.dat'
                fname = gmTools.get_unique_filename (
                        prefix = '%s-gm-export_item%s-' % (self._payload['list_position'], patient_part),
                        suffix = suffix,
                        tmp_dir = directory
                )
                return fname

        #--------------------------------------------------------
        # helpers
        #--------------------------------------------------------
        def __save_normal_item(self, filename:str=None, directory:str=None, passphrase:str=None, convert2pdf:bool=False) -> str:
                """Saves item to disk where item is a "normal" row in the export area.

                Item must not be a DIRENTRY nor a link pointing to a
                document part in the archive.

                Always eventually use the return value as the
                canonical filename for further processing and do not
                rely on <filename> being returned as-is. This may be
                due to <filename> not carrying a path (in which case
                <directory> is used), or the exported file being
                converted to PDF and/or encrypted which may change
                the file extension.

                Args:
                        filename: a target filename, or rather, a template for same
                        directory: target directory, when filename is not given, or does not include a path

                Returns:
                        The ultimate name of the file, saved, converted, and encrypted, as instructed.
                """
                #_log.debug('<filename> %s', filename)
                #_log.debug('<directory> %s', directory)
                tmp_fname = gmTools.get_unique_filename()
                _log.debug('temporary dump file: %s', tmp_fname)
                # make sure output file eventually ends up in dir-of-<filename> or in <directory>
                target_path = None
                if filename:
                        target_path = gmTools.fname_dir(filename)
                if not target_path:
                        target_path = directory
                if not target_path:
                        target_path = gmTools.fname_dir(tmp_fname)
                if target_path.strip() == '':
                        target_path = None
                _log.debug('target path: %s', target_path)
                SQL = 'SELECT substring(data FROM %(start)s FOR %(size)s) FROM clin.export_item WHERE pk = %(pk)s'
                success = gmPG2.bytea2file (
                        data_query = {'cmd': SQL, 'args': {'pk': self.pk_obj}},
                        filename = tmp_fname,
                        data_size = self._payload['size']
                )
                if not success:
                        return None

                tmp_fname = gmMimeLib.adjust_extension_by_mimetype(tmp_fname)
                #_log.debug('extension-adjusted temporary dump file: %s', tmp_fname)
                if convert2pdf:
                        pdf_fname = gmMimeLib.convert_file(filename = tmp_fname, target_mime = 'application/pdf', target_extension = '.pdf')
                        if not pdf_fname:
                                return None

                        tmp_fname = pdf_fname
                        _log.debug('converted to pdf: %s', tmp_fname)

                if not passphrase:
                        if filename:
                                target_fname = os.path.join(target_path, gmTools.fname_from_path(filename))
                                #_log.debug('target filename from passed in name: %s', target_fname)
                        else:
                                target_fname = self.get_useful_filename(directory = target_path)
                                #_log.debug('generated target filename: %s', target_fname)
                        if not gmTools.rename_file(tmp_fname, target_fname, overwrite = True, allow_symlink = True):
                                return None

                        ext = None
                        if filename:
                                ext = gmTools.fname_extension(filename)
                        if not ext:                     # either empty or filename not passed in
                                target_fname = gmMimeLib.adjust_extension_by_mimetype(target_fname)
                                _log.debug('extension-adjusted target file: %s', target_fname)
                        return target_fname

                enc_fname = gmCrypto.encrypt_file (
                        filename = tmp_fname,
                        passphrase = passphrase,
                        verbose = _cfg.get(option = 'debug'),
                        remove_unencrypted = True,
                        convert2pdf = False     # already done, if desired
                )
                #_log.debug('encrypted file: %s', enc_fname)
                removed = gmTools.remove_file(tmp_fname)
                if enc_fname is None:
                        _log.error('cannot encrypt or, possibly, convert')
                        return None

                if not removed:
                        _log.error('cannot remove unencrypted file')
                        gmTools.remove_file(enc_fname)
                        return None

                target_fname = os.path.join(target_path, gmTools.fname_from_path(enc_fname))
                if not gmTools.rename_file(enc_fname, target_fname, overwrite = True, allow_symlink = True):
                        gmTools.remove_file(enc_fname)
                        return None

                _log.debug('generated target filename: %s', target_fname)
                return target_fname

        #--------------------------------------------------------
        def __save_doc_obj(self, filename=None, directory=None, passphrase=None, convert2pdf:bool=False):
                """Save doc object part into target.

                None: not a doc obj
                True: success
                False: failure (to save/convert/encrypt/remove unencrypted)
                """
                if self._payload['pk_doc_obj'] is None:
                        return None

                part = self.document_part
                if not filename:
                        filename = part.get_useful_filename (
                                make_unique = False,
                                directory = directory,
                                include_gnumed_tag = False,
                                date_before_type = True,
                                name_first = False
                        )
                path, name = os.path.split(filename)
                filename = os.path.join(path, '%s-%s' % (self._payload['list_position'], name))
                if convert2pdf:
                        target_mime = 'application/pdf'
                        target_ext = '.pdf'
                else:
                        target_mime = None
                        target_ext = None
                part_fname = part.save_to_file (
                        filename = filename,
                        target_mime = target_mime,
                        target_extension = target_ext,
                        ignore_conversion_problems = False
                )
                if part_fname is None:
                        _log.error('cannot save document part to file')
                        return False

                if passphrase is None:
                        return part_fname

                enc_filename = gmCrypto.encrypt_file (
                        filename = part_fname,
                        passphrase = passphrase,
                        verbose = _cfg.get(option = 'debug'),
                        remove_unencrypted = True
                )
                removed = gmTools.remove_file(part_fname)
                if enc_filename is None:
                        _log.error('cannot encrypt')
                        return False

                if not removed:
                        _log.error('cannot remove unencrypted file')
                        gmTools.remove_file(enc_filename)
                        return None

                if not filename:
                        return enc_filename

                # make sure encrypted file ends up in dir-of-filename
                target_fname = os.path.join (
                        gmTools.fname_dir(filename),
                        gmTools.fname_from_path(enc_filename)
                )
                if not gmTools.rename_file(enc_filename, target_fname, overwrite = True, allow_symlink = True):
                        gmTools.remove_file(enc_filename)
                        return None

                return target_fname

        #--------------------------------------------------------
        def __save_direntry(self, directory=None, passphrase=None):
                """Move DIRENTRY source into target.

                None: not a DIRENTRY
                True: success
                False: failure
                """
                # do not process malformed entries
                try:
                        tag, node, local_fs_path = self._payload['filename'].split('::', 2)
                except ValueError:
                        _log.exception('malformed DIRENTRY: [%s]', self._payload['filename'])
                        return False
                # source and target paths must not overlap
                if directory is None:
                        directory = gmTools.mk_sandbox_dir(prefix = 'exp-')
                if directory.startswith(local_fs_path):
                        _log.error('cannot dump DIRENTRY item [%s]: must not be subdirectory of target dir [%s]', self._payload['filename'], directory)
                        return False
                if local_fs_path.startswith(directory):
                        _log.error('cannot dump DIRENTRY item [%s]: target dir [%s] must not be subdirectory of DIRENTRY', self._payload['filename'], directory)
                        return False

                _log.debug('dumping DIRENTRY item [%s] into [%s]', self._payload['filename'], directory)
                sandbox_dir = gmTools.mk_sandbox_dir()
                _log.debug('sandbox: %s', sandbox_dir)
                tmp = gmTools.copy_tree_content(local_fs_path, sandbox_dir)
                if tmp is None:
                        _log.error('cannot dump DIRENTRY item [%s] into [%s]: copy error', self._payload['filename'], sandbox_dir)
                        return False

                gmTools.remove_file(os.path.join(tmp, DIRENTRY_README_NAME))
                if passphrase is not None:
                        _log.debug('encrypting sandbox: %s', sandbox_dir)
                        encrypted = gmCrypto.encrypt_directory_content (
                                directory = sandbox_dir,
                                passphrase = passphrase,
                                verbose = _cfg.get(option = 'debug'),
                                remove_unencrypted = True
                        )
                        if not encrypted:
                                _log.error('cannot dump DIRENTRY item [%s]: encryption problem in [%s]', self._payload['filename'], sandbox_dir)
                                return False

                tmp = gmTools.copy_tree_content(sandbox_dir, directory)
                if tmp is None:
                        _log.debug('cannot dump DIRENTRY item [%s] into [%s]: copy error', self._payload['filename'], directory)
                        return False

                return directory

        #--------------------------------------------------------
        # properties
        #--------------------------------------------------------
        def _get_is_doc_part(self):
                return self._payload['pk_doc_obj'] is not None

        is_document_part = property(_get_is_doc_part)

        #--------------------------------------------------------
        def _get_doc_part(self):
                if self._payload['pk_doc_obj'] is None:
                        return None
                return gmDocuments.cDocumentPart(aPK_obj = self._payload['pk_doc_obj'])

        document_part = property(_get_doc_part)

        #--------------------------------------------------------
        def _get_is_print_job(self):
                return self._payload['designation'] == PRINT_JOB_DESIGNATION

        def _set_is_print_job(self, is_print_job):
                desig = gmTools.bool2subst(is_print_job, PRINT_JOB_DESIGNATION, None, None)
                if self._payload['designation'] == desig:
                        return
                self['designation'] = desig
                self.save()

        is_print_job = property(_get_is_print_job, _set_is_print_job)

        #--------------------------------------------------------
        def _is_DIRENTRY(self):
                """Check whether this item looks like a DIRENTRY."""
                if self._payload['filename'] is None:
                        return False
                if not self._payload['filename'].startswith('DIR::'):
                        return False
                if len(self._payload['filename'].split('::', 2)) != 3:
                        _log.exception('DIRENTRY [%s]: malformed', self._payload['filename'])
                        return False
                return True

        is_DIRENTRY = property(_is_DIRENTRY)

        #--------------------------------------------------------
        def _is_local_DIRENTRY(self):
                """Check whether this item is a _local_ DIRENTRY."""
                if not self.is_DIRENTRY:
                        return False

                tag, node, local_fs_path = self._payload['filename'].split('::', 2)
                if node == platform.node():
                        return True

                _log.warning('DIRENTRY [%s]: not on this machine (%s)', self._payload['filename'], platform.node())
                return False

        is_local_DIRENTRY = property(_is_local_DIRENTRY)

        #--------------------------------------------------------
        def _is_valid_DIRENTRY(self):
                """Check whether this item is a _valid_ DIRENTRY."""
                if not self.is_local_DIRENTRY:
                        return False

                tag, node, local_fs_path = self._payload['filename'].split('::', 2)
                # valid path ?
                if not os.path.isdir(local_fs_path):
                        _log.warning('DIRENTRY [%s]: directory not found (old DIRENTRY ?)', self._payload['filename'])
                        return False
                return True

        is_valid_DIRENTRY = property(_is_valid_DIRENTRY)

        #--------------------------------------------------------
        def _get_DIRENTRY_node(self):
                if not self.is_DIRENTRY:
                        return None

                try:
                        tag, node, local_fs_path = self._payload['filename'].split('::', 2)
                except ValueError:
                        # should not happen because structure already checked in .is_DIRENTRY,
                        # better safe than sorry
                        _log.exception('DIRENTRY [%s]: malformed', self._payload['filename'])
                        return None

                return node

        DIRENTRY_node = property(_get_DIRENTRY_node)

        #--------------------------------------------------------
        def _get_DIRENTRY_path(self):
                if not self.is_DIRENTRY:
                        return None

                try:
                        tag, node, local_fs_path = self._payload['filename'].split('::', 2)
                except ValueError:
                        # should not happen because structure already checked in .is_DIRENTRY,
                        # better safe than sorry
                        _log.exception('DIRENTRY [%s]: malformed', self._payload['filename'])
                        return None

                return local_fs_path

        DIRENTRY_path = property(_get_DIRENTRY_path)

        #--------------------------------------------------------
        def _is_DICOM_directory(self):
                """Check whether this item points to a DICOMDIR."""
                if not self.is_valid_DIRENTRY:
                        return False

                tag, node, local_fs_path = self._payload['filename'].split('::', 2)
                found_DICOMDIR = False
                for fs_entry in os.listdir(local_fs_path):
                        # found a subdir
                        if os.path.isdir(os.path.join(local_fs_path, fs_entry)):
                                # allow for any number of subdirs
                                continue
                        # found a file
                        if fs_entry != 'DICOMDIR':
                                # not named "DICOMDIR" -> not a DICOMDIR DIRENTRY
                                return False

                        # must be named DICOMDIR -> that's the only file allowed (and required) in ./
                        found_DICOMDIR = True
                return found_DICOMDIR

        is_DICOM_directory = property(_is_DICOM_directory)

        #--------------------------------------------------------
        def _has_files_in_root(self):
                """True if there are files in the root directory."""
                tag, node, local_fs_path = self._payload['filename'].split('::', 2)
                for fs_entry in os.listdir(local_fs_path):
                        if os.path.isfile(fs_entry):
                                _log.debug('has files in top level: %s', local_fs_path)
                                return True
                return False

        has_files_in_root = property(_has_files_in_root)

Ancestors

Instance variables

var DIRENTRY_node
Expand source code
def _get_DIRENTRY_node(self):
        if not self.is_DIRENTRY:
                return None

        try:
                tag, node, local_fs_path = self._payload['filename'].split('::', 2)
        except ValueError:
                # should not happen because structure already checked in .is_DIRENTRY,
                # better safe than sorry
                _log.exception('DIRENTRY [%s]: malformed', self._payload['filename'])
                return None

        return node
var DIRENTRY_path
Expand source code
def _get_DIRENTRY_path(self):
        if not self.is_DIRENTRY:
                return None

        try:
                tag, node, local_fs_path = self._payload['filename'].split('::', 2)
        except ValueError:
                # should not happen because structure already checked in .is_DIRENTRY,
                # better safe than sorry
                _log.exception('DIRENTRY [%s]: malformed', self._payload['filename'])
                return None

        return local_fs_path
var document_part
Expand source code
def _get_doc_part(self):
        if self._payload['pk_doc_obj'] is None:
                return None
        return gmDocuments.cDocumentPart(aPK_obj = self._payload['pk_doc_obj'])
var has_files_in_root

True if there are files in the root directory.

Expand source code
def _has_files_in_root(self):
        """True if there are files in the root directory."""
        tag, node, local_fs_path = self._payload['filename'].split('::', 2)
        for fs_entry in os.listdir(local_fs_path):
                if os.path.isfile(fs_entry):
                        _log.debug('has files in top level: %s', local_fs_path)
                        return True
        return False
var is_DICOM_directory

Check whether this item points to a DICOMDIR.

Expand source code
def _is_DICOM_directory(self):
        """Check whether this item points to a DICOMDIR."""
        if not self.is_valid_DIRENTRY:
                return False

        tag, node, local_fs_path = self._payload['filename'].split('::', 2)
        found_DICOMDIR = False
        for fs_entry in os.listdir(local_fs_path):
                # found a subdir
                if os.path.isdir(os.path.join(local_fs_path, fs_entry)):
                        # allow for any number of subdirs
                        continue
                # found a file
                if fs_entry != 'DICOMDIR':
                        # not named "DICOMDIR" -> not a DICOMDIR DIRENTRY
                        return False

                # must be named DICOMDIR -> that's the only file allowed (and required) in ./
                found_DICOMDIR = True
        return found_DICOMDIR
var is_DIRENTRY

Check whether this item looks like a DIRENTRY.

Expand source code
def _is_DIRENTRY(self):
        """Check whether this item looks like a DIRENTRY."""
        if self._payload['filename'] is None:
                return False
        if not self._payload['filename'].startswith('DIR::'):
                return False
        if len(self._payload['filename'].split('::', 2)) != 3:
                _log.exception('DIRENTRY [%s]: malformed', self._payload['filename'])
                return False
        return True
var is_document_part
Expand source code
def _get_is_doc_part(self):
        return self._payload['pk_doc_obj'] is not None
var is_local_DIRENTRY

Check whether this item is a local DIRENTRY.

Expand source code
def _is_local_DIRENTRY(self):
        """Check whether this item is a _local_ DIRENTRY."""
        if not self.is_DIRENTRY:
                return False

        tag, node, local_fs_path = self._payload['filename'].split('::', 2)
        if node == platform.node():
                return True

        _log.warning('DIRENTRY [%s]: not on this machine (%s)', self._payload['filename'], platform.node())
        return False
var is_print_job
Expand source code
def _get_is_print_job(self):
        return self._payload['designation'] == PRINT_JOB_DESIGNATION
var is_valid_DIRENTRY

Check whether this item is a valid DIRENTRY.

Expand source code
def _is_valid_DIRENTRY(self):
        """Check whether this item is a _valid_ DIRENTRY."""
        if not self.is_local_DIRENTRY:
                return False

        tag, node, local_fs_path = self._payload['filename'].split('::', 2)
        # valid path ?
        if not os.path.isdir(local_fs_path):
                _log.warning('DIRENTRY [%s]: directory not found (old DIRENTRY ?)', self._payload['filename'])
                return False
        return True

Methods

def display_via_mime(self, chunksize=0, block=None)
Expand source code
def display_via_mime(self, chunksize=0, block=None):

        # document part
        if self._payload['pk_doc_obj'] is not None:
                return self.document_part.display_via_mime(chunksize = chunksize, block = block)

        # DIR entry
        if self._payload['filename'].startswith('DIR::'):
                # FIXME: error handling with malformed entries
                tag, node, path = self._payload['filename'].split('::', 2)
                if node != platform.node():
                        msg = _(
                                'This item points to a directory on the computer named:\n'
                                ' %s\n'
                                'You are, however, currently using another computer:\n'
                                ' %s\n'
                                'Directory items can only be viewed/saved/exported\n'
                                'on the computer they are pointing to.'
                        ) % (node, platform.node())
                        return False, msg
                success, msg = gmMimeLib.call_viewer_on_file(path, block = block)
                return success, msg

        # data -> save
        fname = self.save_to_file(aChunkSize = chunksize)
        if fname is None:
                return False, ''

        success, msg = gmMimeLib.call_viewer_on_file(fname, block = block)
        if not success:
                return False, msg

        return True, ''
def get_useful_filename(self, patient=None, directory=None)
Expand source code
def get_useful_filename(self, patient=None, directory=None):
        patient_part = ''
        if patient is not None:
                patient_part = '-%s' % patient.subdir_name

        # preserve original filename extension if available
        suffix = '.dat'
        if self._payload['filename'] is not None:
                tmp, suffix = os.path.splitext (
                        gmTools.fname_sanitize(self._payload['filename']).casefold()
                )
                if suffix == '':
                        suffix = '.dat'
        fname = gmTools.get_unique_filename (
                prefix = '%s-gm-export_item%s-' % (self._payload['list_position'], patient_part),
                suffix = suffix,
                tmp_dir = directory
        )
        return fname
def save_to_file(self, aChunkSize: int = 0, filename: str = None, directory: str = None, passphrase: str = None, convert2pdf: bool = False) ‑> str

Save this entry to disk.

Args

filename
The filename to save into, unless this is a DIRENTRY.
directory
The directory to save the DIRENTRY _content into, if this is a DIRENTRY.
passphrase
Encrypt file(s) with this passphrase, if not None.
convert2pdf
Convert file(s) to PDF on the way out. Before encryption, that is.

Returns

Directory for DIRENTRIES, or filename, or None on failure.

Expand source code
def save_to_file(self, aChunkSize:int=0, filename:str=None, directory:str=None, passphrase:str=None, convert2pdf:bool=False) -> str:
        """Save this entry to disk.

        Args:
                filename: The filename to save into, unless this is a DIRENTRY.
                directory: The directory to save the DIRENTRY _content into, if this is a DIRENTRY.
                passphrase: Encrypt file(s) with this passphrase, if not _None_.
                convert2pdf: Convert file(s) to PDF on the way out. Before encryption, that is.

        Returns:
                Directory for DIRENTRIES, or filename, or None on failure.
        """
        if self.is_DIRENTRY and convert2pdf:
                # cannot convert dir entries to PDF
                return None

        # data linked from archive ?
        part_fname = self.__save_doc_obj (
                filename = filename,
                directory = directory,
                passphrase = passphrase,
                convert2pdf = convert2pdf
        )
        if part_fname is False:
                return None

        if part_fname is not None:
                return part_fname

        # valid DIRENTRY ?
        if self.is_valid_DIRENTRY:
                target_dir = self.__save_direntry (
                        directory,
                        passphrase = passphrase
                )
                if target_dir is False:
                        return None

                if target_dir is not None:
                        return target_dir

        # but still a DIRENTRY ?
        if self.is_DIRENTRY:
                # yes, but apparently not valid (anymore), say,
                # other machine or dir not found locally
                return None

        # normal item with data in export area table
        return self.__save_normal_item (
                filename = filename,
                directory = directory,
                passphrase = passphrase,
                convert2pdf = convert2pdf
        )
def update_data(self, data=None)
Expand source code
def update_data(self, data=None):
        assert (data is not None), '<data> must not be <None>'

        if self.is_DIRENTRY or self.is_document_part:
                return False

        SQL = """
                UPDATE clin.export_item SET
                        data = %(data)s::bytea,
                        fk_doc_obj = NULL
                WHERE pk = %(pk)s"""
        args = {'pk': self.pk_obj, 'data': data}
        gmPG2.run_rw_queries(queries = [{'cmd': SQL, 'args': args}], return_data = False)
        # must update XMIN now ...
        self.refetch_payload()
        return True
def update_data_from_file(self, filename=None, convert_document_part=False)
Expand source code
def update_data_from_file(self, filename=None, convert_document_part=False):

        if self.is_DIRENTRY:
                return False

        if self.is_document_part:
                if not convert_document_part:
                        return False

        # sanity check
        if not (os.access(filename, os.R_OK) and os.path.isfile(filename)):
                _log.error('[%s] is not a readable file' % filename)
                return False

        cmd = """
                UPDATE clin.export_item SET
                        data = %(data)s::bytea,
                        fk_doc_obj = NULL,
                        filename = gm.nullify_empty_string(%(fname)s)
                WHERE pk = %(pk)s"""
        args = {'pk': self.pk_obj, 'fname': filename}
        if not gmPG2.file2bytea(query = cmd, filename = filename, args = args):
                return False

        # must update XMIN now ...
        self.refetch_payload()
        return True

Inherited members