Module Gnumed.business.gmHL7
Some HL7 handling.
Expand source code
# -*- coding: utf-8 -*-
"""Some HL7 handling."""
#============================================================
__author__ = "K.Hilbert <Karsten.Hilbert@gmx.net>"
__license__ = "GPL v2 or later"
import sys
import os
import logging
import time
import shutil
import datetime as pyDT
import hl7 as pyhl7
from xml.etree import ElementTree as pyxml
if __name__ == '__main__':
sys.path.insert(0, '../../')
_ = lambda x:x
from Gnumed.pycommon import gmI18N
if __name__ == '__main__':
gmI18N.activate_locale()
gmI18N.install_domain()
from Gnumed.pycommon import gmTools
from Gnumed.pycommon import gmPG2
from Gnumed.pycommon import gmDateTime
from Gnumed.business import gmIncomingData
from Gnumed.business import gmPathLab
from Gnumed.business import gmPerson
from Gnumed.business import gmPraxis
from Gnumed.business import gmStaff
_log = logging.getLogger('gm.hl7')
# constants
HL7_EOL = '\r'
HL7_BRK = '\.br\\'
HL7_SEGMENTS = 'FHS BHS MSH PID PV1 OBX NTE ORC OBR'.split()
HL7_segment2field_count = {
'FHS': 12,
'BHS': 12,
'MSH': 19,
'PID': 30,
'PV1': 52,
'OBR': 43,
'OBX': 17,
'NTE': 3,
'ORC': 19
}
MSH_field__sending_lab = 3
PID_field__name = 5
PID_field__dob = 7
PID_field__gender = 8
PID_component__lastname = 1
PID_component__firstname = 2
PID_component__middlename = 3
OBR_field__service_name = 4
OBR_field__ts_requested = 6
OBR_field__ts_started = 7
OBR_field__ts_ended = 8
OBR_field__ts_specimen_received = 14
OBX_field__set_id = 1
OBX_field__datatype = 2
OBX_field__type = 3
# components of 3rd field:
OBX_component__loinc = 1
OBX_component__name = 2
OBX_field__subid = 4
OBX_field__value = 5
OBX_field__unit = 6
OBX_field__range = 7
OBX_field__abnormal_flag = 8
OBX_field__status = 11
OBX_field__timestamp = 14
NET_field__set_id = 1
NET_field__src = 2
NET_field__note = 3
HL7_field_labels = {
'MSH': {
0: 'Segment Type',
1: 'Field Separator',
2: 'Encoding Characters',
3: 'Sending Application',
4: 'Sending Facility',
5: 'Receiving Application',
6: 'Receiving Facility',
7: 'Date/Time of Message',
8: 'Security',
9: 'Message Type',
10: 'ID: Message Control',
11: 'ID: Processing',
12: 'ID: Version',
14: 'Continuation Pointer',
15: 'Accept Acknowledgement Type',
16: 'Application Acknowledgement Type'
},
'PID': {
0: 'Segment Type',
1: '<PID> Set ID',
2: 'Patient ID (external)',
3: 'Patient ID (internal)',
4: 'Patient ID (alternate)',
5: 'Patient Name',
7: 'Date/Time of birth',
8: 'Administrative Gender',
11: 'Patient Address',
13: 'Patient Phone Number - Home'
},
'OBR': {
0: 'Segment Type',
1: 'ID: Set',
3: 'Filler Order Number (= ORC-3)',
4: 'ID: Universal Service',
5: 'Priority',
6: 'Date/Time requested',
7: 'Date/Time Observation started',
14: 'Date/Time Specimen received',
16: 'Ordering Provider',
18: 'Placer Field 1',
20: 'Filler Field 1',
21: 'Filler Field 2',
22: 'Date/Time Results reported/Status changed',
24: 'ID: Diagnostic Service Section',
25: 'Result Status',
27: 'Quantity/Timing',
28: 'Result Copies To'
},
'ORC': {
0: 'Segment Type',
1: 'Order Control',
3: 'Filler Order Number',
12: 'Ordering Provider'
},
'OBX': {
0: 'Segment Type',
1: 'Set ID',
2: 'Value Type',
3: 'Identifier (LOINC)',
4: 'Observation Sub-ID',
5: 'Value',
6: 'Units',
7: 'References Range (Low - High)',
8: 'Abnormal Flags',
11: 'Result Status',
14: 'Date/Time of Observation'
},
'NTE': {
0: 'Segment Type',
3: 'Comment'
}
}
HL7_GENDERS = {
'F': 'f',
'M': 'm',
'O': None,
'U': None,
None: None
}
#============================================================
# public API
#============================================================
def extract_HL7_from_XML_CDATA(filename, xml_path, target_dir=None):
_log.debug('extracting HL7 from CDATA of <%s> nodes in XML file [%s]', xml_path, filename)
# sanity checks/setup
try:
open(filename).close()
orig_dir = os.path.split(filename)[0]
work_filename = gmTools.get_unique_filename(prefix = 'gm-x2h-%s-' % gmTools.fname_stem(filename), suffix = '.hl7')
if target_dir is None:
target_dir = os.path.join(orig_dir, 'HL7')
done_dir = os.path.join(orig_dir, 'done')
else:
done_dir = os.path.join(target_dir, 'done')
_log.debug('target dir: %s', target_dir)
gmTools.mkdir(target_dir)
gmTools.mkdir(done_dir)
except Exception:
_log.exception('cannot setup unwrapping environment')
return None
hl7_xml = pyxml.ElementTree()
try:
hl7_xml.parse(filename)
except pyxml.ParseError:
_log.exception('cannot parse [%s]' % filename)
return None
nodes = hl7_xml.findall(xml_path)
if len(nodes) == 0:
_log.debug('no nodes found for data extraction')
return None
_log.debug('unwrapping HL7 from XML into [%s]', work_filename)
hl7_file = open(work_filename, mode = 'wt', encoding = 'utf8', newline = '') # universal newlines acceptance but no translation on output
for node in nodes:
# hl7_file.write(node.text.rstrip() + HL7_EOL)
hl7_file.write(node.text + '') # trick to make node.text unicode
hl7_file.close()
target_fname = os.path.join(target_dir, os.path.split(work_filename)[1])
shutil.copy(work_filename, target_dir)
shutil.move(filename, done_dir)
return target_fname
#------------------------------------------------------------
def split_hl7_file(filename, target_dir=None, encoding='utf8'):
"""Multi-step processing of HL7 files.
- input can be multi-MSH / multi-PID / partially malformed HL7
- tries to fix oddities
- splits by MSH
- splits by PID into <target_dir>
- needs write permissions in dir_of(filename)
- moves HL7 files which were successfully split up into dir_of(filename)/done/
- returns (True|False, list_of_PID_files)
"""
local_log_name = gmTools.get_unique_filename (
prefix = gmTools.fname_stem(filename) + '-',
suffix = '.split.log'
)
local_logger = logging.FileHandler(local_log_name)
local_logger.setLevel(logging.DEBUG)
root_logger = logging.getLogger('')
root_logger.addHandler(local_logger)
_log.info('splitting HL7 file: %s', filename)
_log.debug('log file: %s', local_log_name)
# sanity checks/setup
try:
open(filename).close()
orig_dir = os.path.split(filename)[0]
done_dir = os.path.join(orig_dir, 'done')
gmTools.mkdir(done_dir)
error_dir = os.path.join(orig_dir, 'failed')
gmTools.mkdir(error_dir)
work_filename = gmTools.get_unique_filename(prefix = gmTools.fname_stem(filename) + '-', suffix = '.hl7')
if target_dir is None:
target_dir = os.path.join(orig_dir, 'PID')
_log.debug('target dir: %s', target_dir)
gmTools.mkdir(target_dir)
except Exception:
_log.exception('cannot setup splitting environment')
root_logger.removeHandler(local_logger)
return False, None
# split
target_names = []
try:
shutil.copy(filename, work_filename)
fixed_filename = __fix_malformed_hl7_file(work_filename, encoding = encoding)
MSH_fnames = __split_hl7_file_by_MSH(fixed_filename, encoding)
PID_fnames = []
for MSH_fname in MSH_fnames:
PID_fnames.extend(__split_MSH_by_PID(MSH_fname))
for PID_fname in PID_fnames:
shutil.move(PID_fname, target_dir)
target_names.append(os.path.join(target_dir, os.path.split(PID_fname)[1]))
except Exception:
_log.exception('cannot split HL7 file')
for target_name in target_names:
try: os.remove(target_name)
except Exception: pass
root_logger.removeHandler(local_logger)
shutil.move(local_log_name, error_dir)
return False, None
_log.info('successfully split')
root_logger.removeHandler(local_logger)
try:
shutil.move(filename, done_dir)
shutil.move(local_log_name, done_dir)
except shutil.Error:
_log.exception('cannot move hl7 file or log file to holding area')
return True, target_names
#------------------------------------------------------------
def format_hl7_message(message=None, skip_empty_fields=True, eol='\n ', source=None):
# a segment is a line starting with a type
msg = pyhl7.parse(message)
output = []
if source is not None:
output.append([_('HL7 Source'), '%s' % source])
output.append([_('HL7 data size'), _('%s bytes') % len(message)])
output.append([_('HL7 Message'), _(' %s segments (lines)%s') % (len(msg), gmTools.bool2subst(skip_empty_fields, _(', skipping empty fields'), ''))])
max_len = 0
for seg_idx in range(len(msg)):
seg = msg[seg_idx]
seg_type = seg[0][0]
output.append([_('Segment #%s <%s>') % (seg_idx, seg_type), _('%s fields') % len(seg)])
for field_idx in range(len(seg)):
field = seg[field_idx]
try:
label = HL7_field_labels[seg_type][field_idx]
except KeyError:
label = _('HL7 %s field') % seg_type
max_len = max(max_len, len(label))
if len(field) == 0:
if not skip_empty_fields:
output.append(['%2s - %s' % (field_idx, label), _('<EMTPY>')])
continue
if (len(field) == 1) and (('%s' % field[0]).strip() == ''):
if not skip_empty_fields:
output.append(['%2s - %s' % (field_idx, label), _('<EMTPY>')])
continue
content_lines = ('%s' % field).split(HL7_BRK)
output.append(['%2s - %s' % (field_idx, label), content_lines[0]])
for line in content_lines[1:]:
output.append(['', line])
#output.append([u'%2s - %s' % (field_idx, label), u'%s' % field])
if eol is None:
return output
max_len += 7
return eol.join([ '%s: %s' % ((o[0] + (' ' * max_len))[:max_len], o[1]) for o in output ])
#------------------------------------------------------------
def format_hl7_file(filename, skip_empty_fields=True, eol='\n ', return_filename=False, fix_hl7=True):
if fix_hl7:
fixed_name = __fix_malformed_hl7_file(filename)
hl7_file = open(fixed_name, mode = 'rt', encoding = 'utf-8-sig', newline = '') # read universal but pass on untranslated
source = '%s (<- %s)' % (fixed_name, filename)
else:
hl7_file = open(filename, mode = 'rt', encoding = 'utf-8-sig', newline = '') # read universal but pass on untranslated
source = filename
output = format_hl7_message (
message = hl7_file.read(1024 * 1024 * 5), # 5 MB max
skip_empty_fields = skip_empty_fields,
eol = eol,
source = source
)
hl7_file.close()
if not return_filename:
return output
max_len = 120
if eol is None:
output = '\n '.join([ '%s: %s' % ((o[0] + (' ' * max_len))[:max_len], o[1]) for o in output ])
out_name = gmTools.get_unique_filename(prefix = 'gm-formatted_hl7-', suffix = '.hl7')
out_file = open(out_name, mode = 'wt', encoding = 'utf8')
out_file.write(output)
out_file.close()
return out_name
#------------------------------------------------------------
# this is used in the main code:
def stage_single_PID_hl7_file(filename, source=None, encoding='utf8'):
"""Multi-step processing of HL7 files.
- input must be single-MSH / single-PID / normalized HL7
- imports into clin.incoming_data
- needs write permissions in dir_of(filename)
- moves PID files which were successfully staged into dir_of(filename)/done/PID/
"""
local_log_name = gmTools.get_unique_filename (
prefix = gmTools.fname_stem(filename) + '-',
suffix = '.stage.log'
)
local_logger = logging.FileHandler(local_log_name)
local_logger.setLevel(logging.DEBUG)
root_logger = logging.getLogger('')
root_logger.addHandler(local_logger)
_log.info('staging [%s] as unmatched incoming HL7%s', filename, gmTools.coalesce(source, '', ' (%s)'))
_log.debug('log file: %s', local_log_name)
# sanity checks/setup
try:
open(filename).close()
orig_dir = os.path.split(filename)[0]
done_dir = os.path.join(orig_dir, 'done')
gmTools.mkdir(done_dir)
error_dir = os.path.join(orig_dir, 'failed')
gmTools.mkdir(error_dir)
except Exception:
_log.exception('cannot setup staging environment')
root_logger.removeHandler(local_logger)
return False
# stage
try:
incoming = gmIncomingData.create_incoming_data('HL7%s' % gmTools.coalesce(source, '', ' (%s)'), filename)
if incoming is None:
_log.error('cannot stage PID file: %s', filename)
root_logger.removeHandler(local_logger)
shutil.move(filename, error_dir)
shutil.move(local_log_name, error_dir)
return False
incoming.update_data_from_file(fname = filename)
except Exception:
_log.exception('error staging PID file')
root_logger.removeHandler(local_logger)
shutil.move(filename, error_dir)
shutil.move(local_log_name, error_dir)
return False
# set additional data
MSH_file = open(filename, mode = 'rt', encoding = 'utf-8-sig', newline = '')
raw_hl7 = MSH_file.read(1024 * 1024 * 5) # 5 MB max
MSH_file.close()
shutil.move(filename, done_dir)
incoming['comment'] = format_hl7_message (
message = raw_hl7,
skip_empty_fields = True,
eol = '\n'
)
HL7 = pyhl7.parse(raw_hl7)
del raw_hl7
incoming['comment'] += '\n'
incoming['comment'] += ('-' * 80)
incoming['comment'] += '\n\n'
log = open(local_log_name, mode = 'rt', encoding = 'utf-8-sig')
incoming['comment'] += log.read()
log.close()
try:
incoming['lastnames'] = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__name, component_num = PID_component__lastname)
incoming['firstnames'] = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__name, component_num = PID_component__firstname)
val = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__name, component_num = PID_component__middlename)
if val is not None:
incoming['firstnames'] += ' '
incoming['firstnames'] += val
val = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__dob)
if val is not None:
tmp = time.strptime(val, '%Y%m%d')
incoming['dob'] = pyDT.datetime(tmp.tm_year, tmp.tm_mon, tmp.tm_mday, tzinfo = gmDateTime.gmCurrentLocalTimezone)
val = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__gender)
if val is not None:
incoming['gender'] = val
incoming['external_data_id'] = filename
#u'fk_patient_candidates',
# u'request_id', # request ID as found in <data>
# u'postcode',
# u'other_info', # other identifying info in .data
# u'requestor', # Requestor of data (e.g. who ordered test results) if available in source data.
# u'fk_identity',
# u'comment', # a free text comment on this row, eg. why is it here, error logs etc
# u'fk_provider_disambiguated' # The provider the data is relevant to.
except Exception:
_log.exception('cannot add more data')
incoming.save()
_log.info('successfully staged')
root_logger.removeHandler(local_logger)
shutil.move(local_log_name, done_dir)
return True
#------------------------------------------------------------
def process_staged_single_PID_hl7_file(staged_item):
log_name = gmTools.get_unique_filename (
prefix = 'gm-staged_hl7_import-',
suffix = '.log'
)
import_logger = logging.FileHandler(log_name)
import_logger.setLevel(logging.DEBUG)
root_logger = logging.getLogger('')
root_logger.addHandler(import_logger)
_log.debug('log file: %s', log_name)
if not staged_item.lock():
_log.error('cannot lock staged data for HL7 import')
root_logger.removeHandler(import_logger)
return False, log_name
_log.debug('reference ID of staged HL7 data: %s', staged_item['external_data_id'])
filename = staged_item.save_to_file()
_log.debug('unstaged HL7 data into: %s', filename)
if staged_item['pk_identity'] is None:
emr = None
else:
emr = gmPerson.cPatient(staged_item['pk_identity']).emr
success = False
try:
success = __import_single_PID_hl7_file(filename, emr = emr)
if success:
gmIncomingData.delete_incoming_data(pk_incoming_data = staged_item['pk_incoming_data'])
staged_item.unlock()
root_logger.removeHandler(import_logger)
return True, log_name
_log.error('error when importing single-PID/single-MSH file')
except Exception:
_log.exception('error when importing single-PID/single-MSH file')
if not success:
staged_item['comment'] = _('failed import: %s\n') % gmDateTime.pydt_strftime(gmDateTime.pydt_now_here())
staged_item['comment'] += '\n'
staged_item['comment'] += ('-' * 80)
staged_item['comment'] += '\n\n'
log = open(log_name, mode = 'rt', encoding = 'utf-8-sig')
staged_item['comment'] += log.read()
log.close()
staged_item['comment'] += '\n'
staged_item['comment'] += ('-' * 80)
staged_item['comment'] += '\n\n'
staged_item['comment'] += format_hl7_file (
filename,
skip_empty_fields = True,
eol = '\n ',
return_filename = False
)
staged_item.save()
staged_item.unlock()
root_logger.removeHandler(import_logger)
return success, log_name
#------------------------------------------------------------
def import_single_PID_hl7_file(filename):
log_name = '%s.import.log' % filename
import_logger = logging.FileHandler(log_name)
import_logger.setLevel(logging.DEBUG)
root_logger = logging.getLogger('')
root_logger.addHandler(import_logger)
_log.debug('log file: %s', log_name)
success = True
try:
success = __import_single_PID_hl7_file(filename)
if not success:
_log.error('error when importing single-PID/single-MSH file')
except Exception:
_log.exception('error when importing single-PID/single-MSH file')
root_logger.removeHandler(import_logger)
return success, log_name
#============================================================
# internal helpers
#============================================================
def __fix_malformed_hl7_file(filename, encoding='utf8'):
_log.debug('fixing HL7 file [%s]', filename)
# first pass:
# - remove empty lines
# - normalize line endings
# - unwrap wrapped segments (based on the assumption that segments are wrapped until a line starts with a known segment marker)
out1_fname = gmTools.get_unique_filename (
prefix = 'gm_fix1-%s-' % gmTools.fname_stem(filename),
suffix = '.hl7'
)
hl7_in = open(filename, mode = 'rt', encoding = encoding) # universal newlines: translate any type of EOL to \n
hl7_out = open(out1_fname, mode = 'wt', encoding = 'utf8', newline = '') # newline='' -> no translation of EOL at all
is_first_line = True
for line in hl7_in:
# skip empty line
if line.strip() == '':
continue
# starts with known segment ?
segment = line[:3]
if (segment in HL7_SEGMENTS) and (line[3] == '|'):
if not is_first_line:
hl7_out.write(HL7_EOL)
else:
is_first_line = False
else:
hl7_out.write(' ')
hl7_out.write(line.rstrip())
hl7_out.write(HL7_EOL)
hl7_out.close()
hl7_in.close()
# second pass:
# - normalize # of fields per line
# - remove '\.br.\'-only fields ;-)
out2_fname = gmTools.get_unique_filename (
prefix = 'gm_fix2-%s-' % gmTools.fname_stem(filename),
suffix = '.hl7'
)
# we can now _expect_ lines to end in HL7_EOL, anything else is an error
hl7_in = open(out1_fname, mode = 'rt', encoding = 'utf-8-sig', newline = HL7_EOL)
hl7_out = open(out2_fname, mode = 'wt', encoding = 'utf8', newline = '')
for line in hl7_in:
line = line.strip()
seg_type = line[:3] # assumption: field separator = '|'
field_count = line.count('|') + 1 # assumption: no '|' in data ...
try:
required_fields = HL7_segment2field_count[seg_type]
except KeyError:
required_fields = field_count
missing_fields_count = required_fields - field_count
if missing_fields_count > 0:
line += ('|' * missing_fields_count)
cleaned_fields = []
for field in line.split('|'):
if field.replace(HL7_BRK, '').strip() == '':
cleaned_fields.append('')
continue
cleaned = gmTools.strip_prefix(field, HL7_BRK, remove_repeats = True, remove_whitespace = True)
cleaned = gmTools.strip_suffix(cleaned, HL7_BRK, remove_repeats = True, remove_whitespace = True)
cleaned_fields.append(cleaned)
hl7_out.write('|'.join(cleaned_fields) + HL7_EOL)
hl7_out.close()
hl7_in.close()
# third pass:
# - unsplit same-name, same-time, text-type OBX segments
out3_fname = gmTools.get_unique_filename (
prefix = 'gm_fix3-%s-' % gmTools.fname_stem(filename),
suffix = '.hl7'
)
# we can now _expect_ lines to end in HL7_EOL, anything else is an error
hl7_in = open(out2_fname, mode = 'rt', encoding = 'utf-8-sig', newline = HL7_EOL)
hl7_out = open(out3_fname, mode = 'wt', encoding = 'utf8', newline = '')
prev_identity = None
prev_fields = None
for line in hl7_in:
line = line.strip()
if not line.startswith('OBX|'):
if prev_fields is not None:
hl7_out.write('|'.join(prev_fields) + HL7_EOL)
hl7_out.write(line + HL7_EOL)
prev_identity = None
prev_fields = None
curr_fields = None
continue
# first OBX
curr_fields = line.split('|')
if curr_fields[OBX_field__datatype] != 'FT':
hl7_out.write(line + HL7_EOL)
prev_identity = None
prev_fields = None
curr_fields = None
continue
# first FT type OBX
if prev_fields is None:
prev_fields = line.split('|')
prev_identity = line.split('|')
prev_identity[OBX_field__set_id] = ''
prev_identity[OBX_field__subid] = ''
prev_identity[OBX_field__value] = ''
prev_identity = '|'.join(prev_identity)
continue
# non-first FT type OBX
curr_identity = line.split('|')
curr_identity[OBX_field__set_id] = ''
curr_identity[OBX_field__subid] = ''
curr_identity[OBX_field__value] = ''
curr_identity = '|'.join(curr_identity)
if curr_identity != prev_identity:
# write out previous line
hl7_out.write('|'.join(prev_fields) + HL7_EOL)
# keep current fields, since it may start a "repeat FT type OBX block"
prev_fields = curr_fields
prev_identity = curr_identity
continue
if prev_fields[OBX_field__value].endswith(HL7_BRK):
prev_fields[OBX_field__value] += curr_fields[OBX_field__value]
else:
if curr_fields[OBX_field__value].startswith(HL7_BRK):
prev_fields[OBX_field__value] += curr_fields[OBX_field__value]
else:
prev_fields[OBX_field__value] += HL7_BRK
prev_fields[OBX_field__value] += curr_fields[OBX_field__value]
if prev_fields is not None:
hl7_out.write('|'.join(prev_fields) + HL7_EOL)
hl7_out.close()
hl7_in.close()
return out3_fname
#------------------------------------------------------------
def __split_hl7_file_by_MSH(filename, encoding='utf8'):
_log.debug('splitting [%s] into single-MSH files', filename)
hl7_in = open(filename, mode = 'rt', encoding = encoding)
idx = 0
first_line = True
MSH_file = None
MSH_fnames = []
for line in hl7_in:
line = line.strip()
# first line must be MSH
if first_line:
# ignore empty / FHS / BHS lines
if line == '':
continue
if line.startswith('FHS|'):
_log.debug('ignoring FHS')
continue
if line.startswith('BHS|'):
_log.debug('ignoring BHS')
continue
if not line.startswith('MSH|'):
raise ValueError('HL7 file <%s> does not start with "MSH" line' % filename)
first_line = False
# start new file
if line.startswith('MSH|'):
if MSH_file is not None:
MSH_file.close()
idx += 1
out_fname = gmTools.get_unique_filename(prefix = '%s-MSH_%s-' % (gmTools.fname_stem(filename), idx), suffix = 'hl7')
_log.debug('writing message %s to [%s]', idx, out_fname)
MSH_fnames.append(out_fname)
MSH_file = open(out_fname, mode = 'wt', encoding = 'utf8', newline = '')
# ignore BTS / FTS lines
if line.startswith('BTS|'):
_log.debug('ignoring BTS')
continue
if line.startswith('FTS|'):
_log.debug('ignoring FTS')
continue
# else write line to new file
MSH_file.write(line + HL7_EOL)
if MSH_file is not None:
MSH_file.close()
hl7_in.close()
return MSH_fnames
#------------------------------------------------------------
def __split_MSH_by_PID(filename):
"""Assumes:
- ONE MSH per file
- utf8 encoding
- first non-empty line must be MSH line
- next line must be PID line
IOW, what's created by __split_hl7_file_by_MSH()
"""
_log.debug('splitting single-MSH file [%s] into single-PID files', filename)
MSH_in = open(filename, mode = 'rt', encoding = 'utf-8-sig')
looking_for_MSH = True
MSH_line = None
looking_for_first_PID = True
PID_file = None
PID_fnames = []
idx = 0
for line in MSH_in:
line = line.strip()
# ignore empty
if line == '':
continue
# first non-empty line must be MSH
if looking_for_MSH:
if line.startswith('MSH|'):
looking_for_MSH = False
MSH_line = line + HL7_EOL
continue
raise ValueError('HL7 MSH file <%s> does not start with "MSH" line' % filename)
else:
if line.startswith('MSH|'):
raise ValueError('HL7 single-MSH file <%s> contains more than one MSH line' % filename)
# first non-empty line after MSH must be PID
if looking_for_first_PID:
if not line.startswith('PID|'):
raise ValueError('HL7 MSH file <%s> does not have "PID" line follow "MSH" line' % filename)
looking_for_first_PID = False
# start new file if line is PID
if line.startswith('PID|'):
if PID_file is not None:
PID_file.close()
idx += 1
out_fname = gmTools.get_unique_filename(prefix = '%s-PID_%s-' % (gmTools.fname_stem(filename), idx), suffix = 'hl7')
_log.debug('writing message for PID %s to [%s]', idx, out_fname)
PID_fnames.append(out_fname)
PID_file = open(out_fname, mode = 'wt', encoding = 'utf8', newline = '')
PID_file.write(MSH_line)
# else write line to new file
PID_file.write(line + HL7_EOL)
if PID_file is not None:
PID_file.close()
MSH_in.close()
return PID_fnames
#------------------------------------------------------------
def __find_or_create_lab(hl7_lab, link_obj=None):
comment_tag = '[HL7 name::%s]' % hl7_lab
for gm_lab in gmPathLab.get_test_orgs():
if comment_tag in gmTools.coalesce(gm_lab['comment'], ''):
_log.debug('found lab [%s] from HL7 file in GNUmed database:', hl7_lab)
_log.debug(gm_lab)
return gm_lab
_log.debug('lab not found: %s', hl7_lab)
gm_lab = gmPathLab.create_test_org(link_obj = link_obj, name = hl7_lab, comment = comment_tag)
if gm_lab is None:
raise ValueError('cannot create lab [%s] in GNUmed' % hl7_lab)
_log.debug('created lab: %s', gm_lab)
return gm_lab
#------------------------------------------------------------
def __find_or_create_test_type(loinc=None, name=None, pk_lab=None, unit=None, link_obj=None, abbrev=None):
tt = gmPathLab.find_measurement_type(link_obj = link_obj, lab = pk_lab, name = name)
if tt is None:
_log.debug('test type [%s::%s::%s] not found for lab #%s, creating', name, unit, loinc, pk_lab)
tt = gmPathLab.create_measurement_type(link_obj = link_obj, lab = pk_lab, abbrev = gmTools.coalesce(abbrev, name), unit = unit, name = name)
_log.debug('created as: %s', tt)
if loinc is None:
return tt
if loinc.strip() == '':
return tt
if tt['loinc'] is None:
tt['loinc'] = loinc
tt.save(conn = link_obj)
return tt
if tt['loinc'] != loinc:
# raise ValueError('LOINC code mismatch between GM (%s) and HL7 (%s) for result type [%s]' % (tt['loinc'], loinc, name))
_log.error('LOINC code mismatch between GM (%s) and HL7 (%s) for result type [%s]', tt['loinc'], loinc, name)
return tt
#------------------------------------------------------------
def __ensure_hl7_test_types_exist_in_gnumed(link_obj=None, hl7_data=None, pk_test_org=None):
try:
OBX_count = len(hl7_data.segments('OBX'))
except KeyError:
_log.error("HL7 does not contain OBX segments, nothing to do")
return
for OBX_idx in range(OBX_count):
unit = hl7_data.extract_field(segment = 'OBX', segment_num = OBX_idx, field_num = OBX_field__unit)
if unit == '':
unit = None
LOINC = hl7_data.extract_field(segment = 'OBX', segment_num = OBX_idx, field_num = OBX_field__type, component_num = OBX_component__loinc)
tname = hl7_data.extract_field(segment = 'OBX', segment_num = OBX_idx, field_num = OBX_field__type, component_num = OBX_component__name)
__find_or_create_test_type (
loinc = LOINC,
name = tname,
pk_lab = pk_test_org,
unit = unit,
link_obj = link_obj
)
#------------------------------------------------------------
def __PID2dto(HL7=None):
pat_lname = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__name, component_num = PID_component__lastname)
pat_fname = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__name, component_num = PID_component__firstname)
pat_mname = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__name, component_num = PID_component__middlename)
if pat_mname is not None:
pat_fname += ' '
pat_fname += pat_mname
_log.debug('patient data from PID segment: first=%s (middle=%s) last=%s', pat_fname, pat_mname, pat_lname)
dto = gmPerson.cDTO_person()
dto.firstnames = pat_fname
dto.lastnames = pat_lname
dto.gender = HL7_GENDERS[HL7.extract_field('PID', segment_num = 1, field_num = PID_field__gender)]
hl7_dob = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__dob)
if hl7_dob is not None:
tmp = time.strptime(hl7_dob, '%Y%m%d')
dto.dob = pyDT.datetime(tmp.tm_year, tmp.tm_mon, tmp.tm_mday, tzinfo = gmDateTime.gmCurrentLocalTimezone)
idents = dto.get_candidate_identities()
if len(idents) == 0:
_log.warning('no match candidate, not auto-importing')
_log.debug(dto)
return []
if len(idents) > 1:
_log.warning('more than one match candidate, not auto-importing')
_log.debug(dto)
return idents
return [gmPerson.cPatient(idents[0].ID)]
#------------------------------------------------------------
def __hl7dt2pydt(hl7dt):
if hl7dt == '':
return None
if len(hl7dt) == 8:
tmp = time.strptime(hl7dt, '%Y%m%d')
return pyDT.datetime(tmp.tm_year, tmp.tm_mon, tmp.tm_mday, tzinfo = gmDateTime.gmCurrentLocalTimezone)
if len(hl7dt) == 12:
tmp = time.strptime(hl7dt, '%Y%m%d%H%M')
return pyDT.datetime(tmp.tm_year, tmp.tm_mon, tmp.tm_mday, tmp.tm_hour, tmp.tm_min, tzinfo = gmDateTime.gmCurrentLocalTimezone)
if len(hl7dt) == 14:
tmp = time.strptime(hl7dt, '%Y%m%d%H%M%S')
return pyDT.datetime(tmp.tm_year, tmp.tm_mon, tmp.tm_mday, tmp.tm_hour, tmp.tm_min, tmp.tm_sec, tzinfo = gmDateTime.gmCurrentLocalTimezone)
raise ValueError('Observation timestamp not parseable: [%s]', hl7dt)
#------------------------------------------------------------
def __import_single_PID_hl7_file(filename, emr=None):
"""Assumes single-PID/single-MSH HL7 file."""
_log.debug('importing single-PID single-MSH HL7 data from [%s]', filename)
# read the file
MSH_file = open(filename, mode = 'rt', encoding = 'utf-8-sig', newline = '')
HL7 = pyhl7.parse(MSH_file.read(1024 * 1024 * 5)) # 5 MB max
MSH_file.close()
# sanity checks
if len(HL7.segments('MSH')) != 1:
_log.error('more than one MSH segment')
return False
if len(HL7.segments('PID')) != 1:
_log.error('more than one PID segment')
return False
# ensure lab is in database
hl7_lab = HL7.extract_field('MSH', field_num = MSH_field__sending_lab)
gm_lab = __find_or_create_lab(hl7_lab)
# ensure test types exist
conn = gmPG2.get_connection(readonly = False)
__ensure_hl7_test_types_exist_in_gnumed(link_obj = conn, hl7_data = HL7, pk_test_org = gm_lab['pk_test_org'])
# find patient
if emr is None:
#PID = HL7.segment('PID')
pats = __PID2dto(HL7 = HL7)
if len(pats) == 0:
conn.rollback()
return False
if len(pats) > 1:
conn.rollback()
return False
emr = pats[0].emr
# import values: loop over segments
when_list = {}
current_result = None
previous_segment = None
had_errors = False
msh_seen = False
#pid_seen = False
last_obr = None
obr = {}
for seg_idx in range(len(HL7)):
seg = HL7[seg_idx]
seg_type = seg[0][0]
_log.debug('processing line #%s = segment of type <%s>', seg_idx, seg_type)
if seg_type == 'MSH':
msh_seen = True
if seg_type == 'PID':
if not msh_seen:
conn.rollback()
_log.error('PID segment before MSH segment')
return False
#pid_seen = True
if seg_type in ['MSH', 'PID']:
_log.info('segment already handled')
previous_segment = seg_type
obr = {}
current_result = None
continue
if seg_type in ['ORC']:
_log.info('currently ignoring %s segments', seg_type)
previous_segment = seg_type
obr = {}
current_result = None
continue
if seg_type == 'OBR':
previous_segment = seg_type
last_obr = seg
current_result = None
obr['abbrev'] = ('%s' % seg[OBR_field__service_name][0]).strip()
try:
obr['name'] = ('%s' % seg[OBR_field__service_name][1]).strip()
except IndexError:
obr['name'] = obr['abbrev']
for field_name in [OBR_field__ts_ended, OBR_field__ts_started, OBR_field__ts_specimen_received, OBR_field__ts_requested]:
obr['clin_when'] = seg[field_name][0].strip()
if obr['clin_when'] != '':
break
continue
if seg_type == 'OBX':
current_result = None
# determine value
val_alpha = seg[OBX_field__value][0].strip()
is_num, val_num = gmTools.input2decimal(initial = val_alpha)
if is_num:
val_alpha = None
else:
val_num = None
val_alpha = val_alpha.replace('\.br\\', '\n')
# determine test type
unit = seg[OBX_field__unit][0].strip()
if unit == '':
if is_num:
unit = '1/1'
else:
unit = None
test_type = __find_or_create_test_type (
loinc = '%s' % seg[OBX_field__type][0][OBX_component__loinc-1],
name = '%s' % seg[OBX_field__type][0][OBX_component__name-1],
pk_lab = gm_lab['pk_test_org'],
unit = unit,
link_obj = conn
)
# eventually, episode should be read from lab_request
epi = emr.add_episode (
link_obj = conn,
episode_name = 'administrative',
is_open = False,
allow_dupes = False
)
current_result = emr.add_test_result (
link_obj = conn,
episode = epi['pk_episode'],
type = test_type['pk_test_type'],
intended_reviewer = gmStaff.gmCurrentProvider()['pk_staff'],
val_num = val_num,
val_alpha = val_alpha,
unit = unit
)
# handle range information et al
ref_range = seg[OBX_field__range][0].strip()
if ref_range != '':
current_result.reference_range = ref_range
flag = seg[OBX_field__abnormal_flag][0].strip()
if flag != '':
current_result['abnormality_indicator'] = flag
current_result['status'] = seg[OBX_field__status][0].strip()
current_result['val_grouping'] = seg[OBX_field__subid][0].strip()
current_result['source_data'] = ''
if last_obr is not None:
current_result['source_data'] += str(last_obr)
current_result['source_data'] += '\n'
current_result['source_data'] += str(seg)
clin_when = seg[OBX_field__timestamp][0].strip()
if clin_when == '':
_log.warning('no <Observation timestamp> in OBX, trying OBR timestamp')
clin_when = obr['clin_when']
try:
clin_when = __hl7dt2pydt(clin_when)
except ValueError:
_log.exception('clin_when from OBX or OBR not useable, assuming <today>')
if clin_when is not None:
current_result['clin_when'] = clin_when
current_result.save(conn = conn)
when_list[gmDateTime.pydt_strftime(current_result['clin_when'], '%Y %b %d')] = 1
previous_segment = seg_type
continue
if seg_type == 'NTE':
note = seg[NET_field__note][0].strip().replace('\.br\\', '\n')
if note == '':
_log.debug('empty NTE segment')
previous_segment = seg_type # maybe not ? (HL7 providers happen to use empty NTE segments to "structure" raw HL7 |-)
continue
# if this is an NTE following an OBR (IOW an order-related
# comment): make this a test result all of its own :-)
if previous_segment == 'OBR':
_log.debug('NTE following OBR: general note, using OBR timestamp [%s]', obr['clin_when'])
current_result = None
name = obr['name']
if name == '':
name = _('Comment')
# FIXME: please suggest a LOINC for "order comment"
test_type = __find_or_create_test_type(name = name, pk_lab = gm_lab['pk_test_org'], abbrev = obr['abbrev'], link_obj = conn)
# eventually, episode should be read from lab_request
epi = emr.add_episode (
link_obj = conn,
episode_name = 'administrative',
is_open = False,
allow_dupes = False
)
nte_result = emr.add_test_result (
link_obj = conn,
episode = epi['pk_episode'],
type = test_type['pk_test_type'],
intended_reviewer = gmStaff.gmCurrentProvider()['pk_staff'],
val_alpha = note
)
#nte_result['val_grouping'] = seg[OBX_field__subid][0].strip()
nte_result['source_data'] = str(seg)
try:
nte_result['clin_when'] = __hl7dt2pydt(obr['clin_when'])
except ValueError:
_log.exception('no .clin_when from OBR for NTE pseudo-OBX available')
nte_result.save(conn = conn)
continue
if (previous_segment == 'OBX') and (current_result is not None):
current_result['source_data'] += '\n'
current_result['source_data'] += str(seg)
current_result['note_test_org'] = gmTools.coalesce (
current_result['note_test_org'],
note,
'%%s\n%s' % note
)
current_result.save(conn = conn)
previous_segment = seg_type
continue
_log.error('unexpected NTE segment')
had_errors = True
break
_log.error('unknown segment, aborting')
_log.debug('line: %s', seg)
had_errors = True
break
if had_errors:
conn.rollback()
return False
conn.commit()
# record import in chart
try:
no_results = len(HL7.segments('OBX'))
except KeyError:
no_results = '?'
soap = _(
'Imported HL7 file [%s]:\n'
' lab "%s" (%s@%s), %s results (%s)'
) % (
filename,
hl7_lab,
gm_lab['unit'],
gm_lab['organization'],
no_results,
' / '.join(list(when_list))
)
epi = emr.add_episode (
episode_name = 'administrative',
is_open = False,
allow_dupes = False
)
emr.add_clin_narrative (
note = soap,
soap_cat = None,
episode = epi
)
# keep copy of HL7 data in document archive
folder = gmPerson.cPatient(emr.pk_patient).document_folder
hl7_docs = folder.get_documents (
doc_type = 'HL7 data',
pk_episodes = [epi['pk_episode']],
order_by = 'clin_when DESC'
)
if len(hl7_docs) > 0:
# there should only ever be one unless the user manually creates more,
# also, it should always be the latest since "ORDER BY clin_when DESC"
hl7_doc = hl7_docs[0]
else:
hl7_doc = folder.add_document (
document_type = 'HL7 data',
encounter = emr.active_encounter['pk_encounter'],
episode = epi['pk_episode']
)
hl7_doc['comment'] = _('list of imported HL7 data files')
hl7_doc['pk_org_unit'] = gmPraxis.gmCurrentPraxisBranch()['pk_org_unit']
hl7_doc['clin_when'] = gmDateTime.pydt_now_here()
hl7_doc.save()
part = hl7_doc.add_part(file = filename)
part['obj_comment'] = _('Result dates: %s') % ' / '.join(list(when_list))
part.save()
hl7_doc.set_reviewed(technically_abnormal = False, clinically_relevant = False)
return True
#------------------------------------------------------------
# this is only used for testing here in this file
def __stage_MSH_as_incoming_data(filename, source=None, logfile=None):
"""Consumes single-MSH single-PID HL7 files."""
_log.debug('staging [%s] as unmatched incoming HL7%s', gmTools.coalesce(source, '', ' (%s)'), filename)
# parse HL7
MSH_file = open(filename, mode = 'rt', encoding = 'utf-8-sig', newline = '')
raw_hl7 = MSH_file.read(1024 * 1024 * 5) # 5 MB max
MSH_file.close()
formatted_hl7 = format_hl7_message (
message = raw_hl7,
skip_empty_fields = True,
eol = '\n'
)
HL7 = pyhl7.parse(raw_hl7)
del raw_hl7
# import file
incoming = gmIncomingData.create_incoming_data('HL7%s' % gmTools.coalesce(source, '', ' (%s)'), filename)
if incoming is None:
return None
incoming.update_data_from_file(fname = filename)
incoming['comment'] = formatted_hl7
if logfile is not None:
log = open(logfile, mode = 'rt', encoding = 'utf-8-sig')
incoming['comment'] += '\n'
incoming['comment'] += ('-' * 80)
incoming['comment'] += '\n\n'
incoming['comment'] += log.read()
log.close()
try:
incoming['lastnames'] = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__name, component_num = PID_component__lastname)
incoming['firstnames'] = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__name, component_num = PID_component__firstname)
val = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__name, component_num = PID_component__middlename)
if val is not None:
incoming['firstnames'] += ' '
incoming['firstnames'] += val
val = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__dob)
if val is not None:
tmp = time.strptime(val, '%Y%m%d')
incoming['dob'] = pyDT.datetime(tmp.tm_year, tmp.tm_mon, tmp.tm_mday, tzinfo = gmDateTime.gmCurrentLocalTimezone)
val = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__gender)
if val is not None:
incoming['gender'] = val
incoming['external_data_id'] = filename
#u'fk_patient_candidates',
# u'request_id', # request ID as found in <data>
# u'postcode',
# u'other_info', # other identifying info in .data
# u'requestor', # Requestor of data (e.g. who ordered test results) if available in source data.
# u'fk_identity',
# u'comment', # a free text comment on this row, eg. why is it here, error logs etc
# u'fk_provider_disambiguated' # The provider the data is relevant to.
except KeyError:
_log.exception('no PID segment, cannot add more data')
incoming.save()
return incoming
#============================================================
# main
#------------------------------------------------------------
if __name__ == "__main__":
if len(sys.argv) < 2:
sys.exit()
if sys.argv[1] != 'test':
sys.exit()
gmDateTime.init()
gmTools.gmPaths()
#-------------------------------------------------------
def test_import_HL7(filename):
# would normally be set by external configuration:
from Gnumed.business import gmPraxis
gmPraxis.gmCurrentPraxisBranch(branch = gmPraxis.get_praxis_branches()[0])
#if not import_hl7_file(filename):
# print("error with", filename)
#-------------------------------------------------------
def test_xml_extract():
hl7 = extract_HL7_from_XML_CDATA(sys.argv[2], './/Message')
print("HL7:", hl7)
#result, PID_fnames = split_hl7_file(hl7)
#print "result:", result
#print "per-PID MSH files:"
#for name in PID_fnames:
# print " ", name
#-------------------------------------------------------
def test_stage_hl7_from_xml():
hl7 = extract_HL7_from_XML_CDATA(sys.argv[2], './/Message')
print("HL7:", hl7)
result, PID_fnames = split_hl7_file(hl7)
print("result:", result)
print("staging per-PID HL7 files:")
for name in PID_fnames:
print(" file:", name)
__stage_MSH_as_incoming_data(name, source = 'Excelleris')
#-------------------------------------------------------
def test_split_hl7_file():
result, PID_fnames = split_hl7_file(sys.argv[2])
print("result:", result)
print("per-PID HL7 files:")
for name in PID_fnames:
print(" file:", name)
#-------------------------------------------------------
def test_stage_hl7():
fixed = __fix_malformed_hl7_file(sys.argv[2])
print("fixed HL7:", fixed)
#PID_fnames = split_HL7_by_PID(fixed, encoding='utf8')
print("staging per-PID HL7 files:")
for name in []: #PID_fnames:
print(" file:", name)
#print "", __stage_MSH_as_incoming_data(name, source = u'?')
#-------------------------------------------------------
def test_format_hl7_message():
tests = [
"OBR|1||03-1350023-LIP-0|LIP^Lipids||20031004073300|20031004073300|||||||20031004073300||22333^MEDIC^IAN^TEST||031350023||03-1350023|031350023|20031004131600||CHEM|F|||22333^MEDIC^IAN^TEST",
"OBX|2|NM|22748-8^LDL Cholesterol||4.0|mmol/L|1.5 - 3.4|H|||F|||20031004073300"
]
for test in tests:
print(format_hl7_message (
# skip_empty_fields = True,
message = test
))
#-------------------------------------------------------
def test_format_hl7_file(filename):
print(format_hl7_file (
filename,
# skip_empty_fields = True
return_filename = True
))
#-------------------------------------------------------
def test___fix_malformed_hl7():
print("fixed HL7:", __fix_malformed_hl7_file(sys.argv[2]))
#-------------------------------------------------------
def test_parse_hl7():
MSH_file = open(sys.argv[2], mode = 'rt', encoding = 'utf-8-sig', newline = '')
raw_hl7 = MSH_file.read(1024 * 1024 * 5) # 5 MB max
MSH_file.close()
print(format_hl7_message (
message = raw_hl7,
skip_empty_fields = True,
eol = '\n'
))
HL7 = pyhl7.parse(raw_hl7)
del raw_hl7
for seg in HL7.segments('MSH'):
print(seg)
print("PID:")
print(HL7.extract_field('PID'))
print(HL7.extract_field('PID', segment_num = 1, field_num = PID_field__name, component_num = PID_component__lastname))
print(HL7.extract_field('PID', segment_num = 1, field_num = PID_field__name, component_num = PID_component__lastname))
# incoming['firstnames'] = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__name, component_num = PID_component__firstname)
# val = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__name, component_num = PID_component__middlename)
# if val is not None:
# incoming['firstnames'] += u' '
# incoming['firstnames'] += val
# val = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__dob)
# if val is not None:
# tmp = time.strptime(val, '%Y%m%d')
# incoming['dob'] = pyDT.datetime(tmp.tm_year, tmp.tm_mon, tmp.tm_mday, tzinfo = gmDateTime.gmCurrentLocalTimezone)
# val = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__gender)
# if val is not None:
# incoming['gender'] = val
# incoming['external_data_id'] = filename
#-------------------------------------------------------
#test_import_HL7(sys.argv[2])
#test_xml_extract()
#test_stage_hl7_from_xml()
#test_stage_hl7()
#test_format_hl7_message()
#test_format_hl7_file(sys.argv[2])
#test___fix_malformed_hl7()
#test_split_hl7_file()
test_parse_hl7()
Functions
def extract_HL7_from_XML_CDATA(filename, xml_path, target_dir=None)
-
Expand source code
def extract_HL7_from_XML_CDATA(filename, xml_path, target_dir=None): _log.debug('extracting HL7 from CDATA of <%s> nodes in XML file [%s]', xml_path, filename) # sanity checks/setup try: open(filename).close() orig_dir = os.path.split(filename)[0] work_filename = gmTools.get_unique_filename(prefix = 'gm-x2h-%s-' % gmTools.fname_stem(filename), suffix = '.hl7') if target_dir is None: target_dir = os.path.join(orig_dir, 'HL7') done_dir = os.path.join(orig_dir, 'done') else: done_dir = os.path.join(target_dir, 'done') _log.debug('target dir: %s', target_dir) gmTools.mkdir(target_dir) gmTools.mkdir(done_dir) except Exception: _log.exception('cannot setup unwrapping environment') return None hl7_xml = pyxml.ElementTree() try: hl7_xml.parse(filename) except pyxml.ParseError: _log.exception('cannot parse [%s]' % filename) return None nodes = hl7_xml.findall(xml_path) if len(nodes) == 0: _log.debug('no nodes found for data extraction') return None _log.debug('unwrapping HL7 from XML into [%s]', work_filename) hl7_file = open(work_filename, mode = 'wt', encoding = 'utf8', newline = '') # universal newlines acceptance but no translation on output for node in nodes: # hl7_file.write(node.text.rstrip() + HL7_EOL) hl7_file.write(node.text + '') # trick to make node.text unicode hl7_file.close() target_fname = os.path.join(target_dir, os.path.split(work_filename)[1]) shutil.copy(work_filename, target_dir) shutil.move(filename, done_dir) return target_fname
def format_hl7_file(filename, skip_empty_fields=True, eol='\n ', return_filename=False, fix_hl7=True)
-
Expand source code
def format_hl7_file(filename, skip_empty_fields=True, eol='\n ', return_filename=False, fix_hl7=True): if fix_hl7: fixed_name = __fix_malformed_hl7_file(filename) hl7_file = open(fixed_name, mode = 'rt', encoding = 'utf-8-sig', newline = '') # read universal but pass on untranslated source = '%s (<- %s)' % (fixed_name, filename) else: hl7_file = open(filename, mode = 'rt', encoding = 'utf-8-sig', newline = '') # read universal but pass on untranslated source = filename output = format_hl7_message ( message = hl7_file.read(1024 * 1024 * 5), # 5 MB max skip_empty_fields = skip_empty_fields, eol = eol, source = source ) hl7_file.close() if not return_filename: return output max_len = 120 if eol is None: output = '\n '.join([ '%s: %s' % ((o[0] + (' ' * max_len))[:max_len], o[1]) for o in output ]) out_name = gmTools.get_unique_filename(prefix = 'gm-formatted_hl7-', suffix = '.hl7') out_file = open(out_name, mode = 'wt', encoding = 'utf8') out_file.write(output) out_file.close() return out_name
def format_hl7_message(message=None, skip_empty_fields=True, eol='\n ', source=None)
-
Expand source code
def format_hl7_message(message=None, skip_empty_fields=True, eol='\n ', source=None): # a segment is a line starting with a type msg = pyhl7.parse(message) output = [] if source is not None: output.append([_('HL7 Source'), '%s' % source]) output.append([_('HL7 data size'), _('%s bytes') % len(message)]) output.append([_('HL7 Message'), _(' %s segments (lines)%s') % (len(msg), gmTools.bool2subst(skip_empty_fields, _(', skipping empty fields'), ''))]) max_len = 0 for seg_idx in range(len(msg)): seg = msg[seg_idx] seg_type = seg[0][0] output.append([_('Segment #%s <%s>') % (seg_idx, seg_type), _('%s fields') % len(seg)]) for field_idx in range(len(seg)): field = seg[field_idx] try: label = HL7_field_labels[seg_type][field_idx] except KeyError: label = _('HL7 %s field') % seg_type max_len = max(max_len, len(label)) if len(field) == 0: if not skip_empty_fields: output.append(['%2s - %s' % (field_idx, label), _('<EMTPY>')]) continue if (len(field) == 1) and (('%s' % field[0]).strip() == ''): if not skip_empty_fields: output.append(['%2s - %s' % (field_idx, label), _('<EMTPY>')]) continue content_lines = ('%s' % field).split(HL7_BRK) output.append(['%2s - %s' % (field_idx, label), content_lines[0]]) for line in content_lines[1:]: output.append(['', line]) #output.append([u'%2s - %s' % (field_idx, label), u'%s' % field]) if eol is None: return output max_len += 7 return eol.join([ '%s: %s' % ((o[0] + (' ' * max_len))[:max_len], o[1]) for o in output ])
def import_single_PID_hl7_file(filename)
-
Expand source code
def import_single_PID_hl7_file(filename): log_name = '%s.import.log' % filename import_logger = logging.FileHandler(log_name) import_logger.setLevel(logging.DEBUG) root_logger = logging.getLogger('') root_logger.addHandler(import_logger) _log.debug('log file: %s', log_name) success = True try: success = __import_single_PID_hl7_file(filename) if not success: _log.error('error when importing single-PID/single-MSH file') except Exception: _log.exception('error when importing single-PID/single-MSH file') root_logger.removeHandler(import_logger) return success, log_name
def process_staged_single_PID_hl7_file(staged_item)
-
Expand source code
def process_staged_single_PID_hl7_file(staged_item): log_name = gmTools.get_unique_filename ( prefix = 'gm-staged_hl7_import-', suffix = '.log' ) import_logger = logging.FileHandler(log_name) import_logger.setLevel(logging.DEBUG) root_logger = logging.getLogger('') root_logger.addHandler(import_logger) _log.debug('log file: %s', log_name) if not staged_item.lock(): _log.error('cannot lock staged data for HL7 import') root_logger.removeHandler(import_logger) return False, log_name _log.debug('reference ID of staged HL7 data: %s', staged_item['external_data_id']) filename = staged_item.save_to_file() _log.debug('unstaged HL7 data into: %s', filename) if staged_item['pk_identity'] is None: emr = None else: emr = gmPerson.cPatient(staged_item['pk_identity']).emr success = False try: success = __import_single_PID_hl7_file(filename, emr = emr) if success: gmIncomingData.delete_incoming_data(pk_incoming_data = staged_item['pk_incoming_data']) staged_item.unlock() root_logger.removeHandler(import_logger) return True, log_name _log.error('error when importing single-PID/single-MSH file') except Exception: _log.exception('error when importing single-PID/single-MSH file') if not success: staged_item['comment'] = _('failed import: %s\n') % gmDateTime.pydt_strftime(gmDateTime.pydt_now_here()) staged_item['comment'] += '\n' staged_item['comment'] += ('-' * 80) staged_item['comment'] += '\n\n' log = open(log_name, mode = 'rt', encoding = 'utf-8-sig') staged_item['comment'] += log.read() log.close() staged_item['comment'] += '\n' staged_item['comment'] += ('-' * 80) staged_item['comment'] += '\n\n' staged_item['comment'] += format_hl7_file ( filename, skip_empty_fields = True, eol = '\n ', return_filename = False ) staged_item.save() staged_item.unlock() root_logger.removeHandler(import_logger) return success, log_name
def split_hl7_file(filename, target_dir=None, encoding='utf8')
-
Multi-step processing of HL7 files.
- input can be multi-MSH / multi-PID / partially malformed HL7
- tries to fix oddities
- splits by MSH
-
splits by PID into
-
needs write permissions in dir_of(filename)
-
moves HL7 files which were successfully split up into dir_of(filename)/done/
-
returns (True|False, list_of_PID_files)
Expand source code
def split_hl7_file(filename, target_dir=None, encoding='utf8'): """Multi-step processing of HL7 files. - input can be multi-MSH / multi-PID / partially malformed HL7 - tries to fix oddities - splits by MSH - splits by PID into <target_dir> - needs write permissions in dir_of(filename) - moves HL7 files which were successfully split up into dir_of(filename)/done/ - returns (True|False, list_of_PID_files) """ local_log_name = gmTools.get_unique_filename ( prefix = gmTools.fname_stem(filename) + '-', suffix = '.split.log' ) local_logger = logging.FileHandler(local_log_name) local_logger.setLevel(logging.DEBUG) root_logger = logging.getLogger('') root_logger.addHandler(local_logger) _log.info('splitting HL7 file: %s', filename) _log.debug('log file: %s', local_log_name) # sanity checks/setup try: open(filename).close() orig_dir = os.path.split(filename)[0] done_dir = os.path.join(orig_dir, 'done') gmTools.mkdir(done_dir) error_dir = os.path.join(orig_dir, 'failed') gmTools.mkdir(error_dir) work_filename = gmTools.get_unique_filename(prefix = gmTools.fname_stem(filename) + '-', suffix = '.hl7') if target_dir is None: target_dir = os.path.join(orig_dir, 'PID') _log.debug('target dir: %s', target_dir) gmTools.mkdir(target_dir) except Exception: _log.exception('cannot setup splitting environment') root_logger.removeHandler(local_logger) return False, None # split target_names = [] try: shutil.copy(filename, work_filename) fixed_filename = __fix_malformed_hl7_file(work_filename, encoding = encoding) MSH_fnames = __split_hl7_file_by_MSH(fixed_filename, encoding) PID_fnames = [] for MSH_fname in MSH_fnames: PID_fnames.extend(__split_MSH_by_PID(MSH_fname)) for PID_fname in PID_fnames: shutil.move(PID_fname, target_dir) target_names.append(os.path.join(target_dir, os.path.split(PID_fname)[1])) except Exception: _log.exception('cannot split HL7 file') for target_name in target_names: try: os.remove(target_name) except Exception: pass root_logger.removeHandler(local_logger) shutil.move(local_log_name, error_dir) return False, None _log.info('successfully split') root_logger.removeHandler(local_logger) try: shutil.move(filename, done_dir) shutil.move(local_log_name, done_dir) except shutil.Error: _log.exception('cannot move hl7 file or log file to holding area') return True, target_names
def stage_single_PID_hl7_file(filename, source=None, encoding='utf8')
-
Multi-step processing of HL7 files.
-
input must be single-MSH / single-PID / normalized HL7
-
imports into clin.incoming_data
-
needs write permissions in dir_of(filename)
- moves PID files which were successfully staged into dir_of(filename)/done/PID/
Expand source code
def stage_single_PID_hl7_file(filename, source=None, encoding='utf8'): """Multi-step processing of HL7 files. - input must be single-MSH / single-PID / normalized HL7 - imports into clin.incoming_data - needs write permissions in dir_of(filename) - moves PID files which were successfully staged into dir_of(filename)/done/PID/ """ local_log_name = gmTools.get_unique_filename ( prefix = gmTools.fname_stem(filename) + '-', suffix = '.stage.log' ) local_logger = logging.FileHandler(local_log_name) local_logger.setLevel(logging.DEBUG) root_logger = logging.getLogger('') root_logger.addHandler(local_logger) _log.info('staging [%s] as unmatched incoming HL7%s', filename, gmTools.coalesce(source, '', ' (%s)')) _log.debug('log file: %s', local_log_name) # sanity checks/setup try: open(filename).close() orig_dir = os.path.split(filename)[0] done_dir = os.path.join(orig_dir, 'done') gmTools.mkdir(done_dir) error_dir = os.path.join(orig_dir, 'failed') gmTools.mkdir(error_dir) except Exception: _log.exception('cannot setup staging environment') root_logger.removeHandler(local_logger) return False # stage try: incoming = gmIncomingData.create_incoming_data('HL7%s' % gmTools.coalesce(source, '', ' (%s)'), filename) if incoming is None: _log.error('cannot stage PID file: %s', filename) root_logger.removeHandler(local_logger) shutil.move(filename, error_dir) shutil.move(local_log_name, error_dir) return False incoming.update_data_from_file(fname = filename) except Exception: _log.exception('error staging PID file') root_logger.removeHandler(local_logger) shutil.move(filename, error_dir) shutil.move(local_log_name, error_dir) return False # set additional data MSH_file = open(filename, mode = 'rt', encoding = 'utf-8-sig', newline = '') raw_hl7 = MSH_file.read(1024 * 1024 * 5) # 5 MB max MSH_file.close() shutil.move(filename, done_dir) incoming['comment'] = format_hl7_message ( message = raw_hl7, skip_empty_fields = True, eol = '\n' ) HL7 = pyhl7.parse(raw_hl7) del raw_hl7 incoming['comment'] += '\n' incoming['comment'] += ('-' * 80) incoming['comment'] += '\n\n' log = open(local_log_name, mode = 'rt', encoding = 'utf-8-sig') incoming['comment'] += log.read() log.close() try: incoming['lastnames'] = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__name, component_num = PID_component__lastname) incoming['firstnames'] = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__name, component_num = PID_component__firstname) val = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__name, component_num = PID_component__middlename) if val is not None: incoming['firstnames'] += ' ' incoming['firstnames'] += val val = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__dob) if val is not None: tmp = time.strptime(val, '%Y%m%d') incoming['dob'] = pyDT.datetime(tmp.tm_year, tmp.tm_mon, tmp.tm_mday, tzinfo = gmDateTime.gmCurrentLocalTimezone) val = HL7.extract_field('PID', segment_num = 1, field_num = PID_field__gender) if val is not None: incoming['gender'] = val incoming['external_data_id'] = filename #u'fk_patient_candidates', # u'request_id', # request ID as found in <data> # u'postcode', # u'other_info', # other identifying info in .data # u'requestor', # Requestor of data (e.g. who ordered test results) if available in source data. # u'fk_identity', # u'comment', # a free text comment on this row, eg. why is it here, error logs etc # u'fk_provider_disambiguated' # The provider the data is relevant to. except Exception: _log.exception('cannot add more data') incoming.save() _log.info('successfully staged') root_logger.removeHandler(local_logger) shutil.move(local_log_name, done_dir) return True
-