Module Gnumed.pycommon.gmCfgINI
GNUmed INI style configuration handling.
Expand source code
"""GNUmed INI style configuration handling.
__author__ = "Karsten Hilbert <>"
__licence__ = "GPL"
import logging
import sys
import re as regex
import shutil
import tempfile
if __name__ == "__main__":
sys.path.insert(0, '../../')
from Gnumed.pycommon import gmBorg
_log = logging.getLogger('gm.cfg')
# helper functions
def __set_opt_in_INI_file(src=None, sink=None, group=None, option=None, value=None):
group_seen = False
option_seen = False
for line in src:
# after option already ?
if option_seen:
# start of list ?
if regex.match(r'(?P<list_name>.+)(\s|\t)*=(\s|\t)*\$(?P=list_name)\$', line) is not None:
# end of list ?
if regex.match(r'\$.+\$.*', line) is not None:
# our group ?
if line.strip() == '[%s]' % group:
group_seen = True
# another group ?
if regex.match(r'\[.+\].*', line) is not None:
# next group but option not seen yet ?
if group_seen and not option_seen:
sink.write('%s = %s\n\n\n' % (option, value))
option_seen = True
# our option ?
if regex.match(r'%s(\s|\t)*=' % option, line) is not None:
if group_seen:
sink.write('%s = %s\n' % (option, value))
option_seen = True
# something else (comment, empty line, or other option)
# all done ?
if option_seen:
# need to add group ?
if not group_seen:
sink.write('[%s]\n' % group)
# We either just added the group or it was the last group
# but did not contain the option. It must have been the
# last group then or else the following group would have
# triggered the option writeout.
sink.write('%s = %s\n' % (option, value))
def __set_list_in_INI_file(src=None, sink=None, group=None, option=None, value=None):
our_group_seen = False
inside_our_group = False
our_list_seen = False
inside_our_list = False
# loop until group found or src empty
for line in src:
if inside_our_list: # can only be true if already inside our group
# new list has been written already
# so now at end of our (old) list ?
if regex.match(r'\$%s\$' % option, line.strip()) is not None:
inside_our_list = False
# skip old list entries
if inside_our_group:
# our option ?
if regex.match(r'%s(\s|\t)*=(\s|\t)*\$%s\$' % (option, option), line.strip()) is not None:
sink.write(line) # list header
sink.write('$%s$\n' % option) # list footer
our_list_seen = True
inside_our_list = True
# next group (= end of our group) ?
if regex.match(r'\[.+\]', line.strip()) is not None:
# our list already handled ? (if so must already be finished)
if not our_list_seen:
# no, so need to add our list to the group before ...
sink.write('%s = $%s$\n' % (option, option)) # list header
sink.write('$%s$\n' % option) # list footer
our_list_seen = True
inside_our_list = False
# ... starting the next group
sink.write(line) # next group header
inside_our_group = False
# other lines inside our group
# our group ?
if line.strip() == '[%s]' % group:
our_group_seen = True
inside_our_group = True
sink.write(line) # group header
# looped over all lines but did not find our group, so add group
if not our_group_seen:
sink.write('[%s]\n' % group)
if not our_list_seen:
# We either just added the group or it was the last group
# but did not contain the option. It must have been the
# last group then or else the group following it would have
# triggered the option writeout.
sink.write('%s = $%s$\n' % (option, option))
sink.write('$%s$\n' % option)
def set_option_in_INI_file(filename=None, group=None, option=None, value=None, encoding='utf8'):
_log.debug('setting option "%s" to "%s" in group [%s]', option, value, group)
_log.debug('file: %s (%s)', filename, encoding)
sink = tempfile.NamedTemporaryFile(suffix = '.cfg', delete = True)
sink_name =
sink.close() # close it so it gets deleted so we can safely open it again
src = open(filename, mode = 'rt', encoding = encoding)
sink = open(sink_name, mode = 'wt', encoding = encoding)
# is value a list ?
if isinstance(value, type([])):
__set_list_in_INI_file(src, sink, group, option, value)
__set_opt_in_INI_file(src, sink, group, option, value)
shutil.copy2(sink_name, filename)
def parse_INI_stream(stream=None, encoding=None):
"""Parse an iterable for INI-style data.
Returns a dict by sections containing a dict of values per section.
_log.debug('parsing INI-style data stream [%s] using [%s]', stream, encoding)
if encoding is None:
encoding = 'utf8'
data = {}
current_group = None
current_option = None
current_option_path = None
inside_list = False
line_idx = 0
for line in stream:
if type(line) is bytes:
line = line.decode(encoding)
line = line.replace('\015', '').replace('\012', '').strip()
line_idx += 1
if inside_list:
if line == '$%s$' % current_option: # end of list
inside_list = False
# noise
if line == '' or line.startswith('#') or line.startswith(';'):
# group
if line.startswith('['):
if not line.endswith(']'):
_log.error('group line does not end in "]", aborting')
raise ValueError('INI-stream parsing error')
group = line.strip('[]').strip()
if group == '':
_log.error('group name is empty, aborting')
raise ValueError('INI-stream parsing error')
current_group = group
# option
if current_group is None:
_log.warning('option found before first group, ignoring')
name, remainder = regex.split(r'\s*[=:]\s*', line, maxsplit = 1)
if name == '':
_log.error('option name empty, aborting')
raise ValueError('INI-stream parsing error')
if remainder.strip() == '':
if ('=' not in line) and (':' not in line):
_log.error('missing name/value separator (= or :), aborting')
raise ValueError('INI-stream parsing error')
current_option = name
current_option_path = '%s::%s' % (current_group, current_option)
if current_option_path in data:
_log.warning('duplicate option [%s]', current_option_path)
value = remainder.split('#', 1)[0].strip()
# start of list ?
if value == '$%s$' % current_option:
inside_list = True
data[current_option_path] = []
data[current_option_path] = value
if inside_list:
_log.critical('unclosed list $%s$ detected at end of config stream [%s]', current_option, stream)
raise SyntaxError('end of config stream but still in list')
return data
class gmCfgData(gmBorg.cBorg):
def __init__(self):
except AttributeError:
self.__cfg_data = {}
self.source_files = {}
def get(self, group=None, option=None, source_order=None):
"""Get the value of a configuration option in a config file.
<source_order> the order in which config files are searched
a list of tuples (source, policy)
return: return only this value immediately
append: append to list of potential values to return
extend: if the value per source happens to be a list
extend (rather than append to) the result list
returns NONE when there's no value for an option
if source_order is None:
source_order = [('internal', 'return')]
results = []
for source, policy in source_order:
_log.debug('searching "%s" in [%s] in %s', option, group, source)
if group is None:
group = source
option_path = '%s::%s' % (group, option)
try: source_data = self.__cfg_data[source]
except KeyError:
_log.error('invalid config source [%s]', source)
_log.debug('currently known sources: %s', list(self.__cfg_data))
try: value = source_data[option_path]
except KeyError:
_log.debug('option [%s] not in group [%s] in source [%s]', option, group, source)
_log.debug('option [%s] found in source [%s]', option_path, source)
if policy == 'return':
return value
if policy == 'extend':
if isinstance(value, type([])):
if len(results) == 0:
return None
return results
def set_option(self, option=None, value=None, group=None, source=None):
"""Set a particular option to a particular value.
Note that this does NOT PERSIST the option anywhere !
if None in [option, value]:
raise ValueError('neither <option> nor <value> can be None')
if source is None:
source = 'internal'
except KeyError:
self.__cfg_data[source] = {}
if group is None:
group = source
option_path = '%s::%s' % (group, option)
self.__cfg_data[source][option_path] = value
# API: source related
def add_stream_source(self, source=None, stream=None, encoding=None):
data = parse_INI_stream(stream = stream, encoding = encoding)
if source in self.__cfg_data:
_log.warning('overriding source <%s> with [%s]', source, stream)
self.__cfg_data[source] = data
def add_file_source(self, source=None, file=None, encoding='utf8'):
"""Add a source (a file) to the instance."""'file source "%s": %s (%s)', source, file, encoding)
for existing_source, existing_file in self.source_files.items():
if existing_file == file:
if source != existing_source:
_log.warning('file [%s] already known as source [%s]', file, existing_source)
_log.warning('adding it as source [%s] may provoke trouble', source)
cfg_file = None
if file is not None:
cfg_file = open(file, mode = 'rt', encoding = encoding)
except IOError:
_log.error('cannot open [%s], keeping as dummy source', file)
if cfg_file is None:
file = None
if source in self.__cfg_data:
_log.warning('overriding source <%s> with dummy', source)
self.__cfg_data[source] = {}
self.add_stream_source(source = source, stream = cfg_file)
self.source_files[source] = file
def remove_source(self, source):
"""Remove a source from the instance."""'removing source <%s>', source)
del self.__cfg_data[source]
except KeyError:
_log.warning("source <%s> doesn't exist", source)
del self.source_files[source]
except KeyError:
def reload_file_source(self, file=None, encoding='utf8'):
if file not in self.source_files.values():
for src, fname in self.source_files.items():
if fname == file:
self.add_file_source(source = src, file = fname, encoding = encoding)
# don't break the loop because there could be other sources
# with the same file (not very reasonable, I know)
def add_cli(self, short_options='', long_options=None):
"""Add command line parameters to config data.
string containing one-letter options such as u'h?' for -h -?
list of strings
'conf-file=' -> --conf-file=<...>
'debug' -> --debug
"""'adding command line arguments')
_log.debug('raw command line is:')
_log.debug('%s', sys.argv)
import getopt
if long_options is None:
long_options = []
opts, remainder = getopt.gnu_getopt (
data = {}
for opt, val in opts:
if val == '':
data['%s::%s' % ('cli', opt)] = True
data['%s::%s' % ('cli', opt)] = val
self.__cfg_data['cli'] = data
# main
if __name__ == "__main__":
if len(sys.argv) < 2:
if sys.argv[1] != 'test':
logging.basicConfig(level = logging.DEBUG)
def test_gmCfgData():
cfg = gmCfgData()
cfg.add_cli(short_options='h?', long_options=['help', 'conf-file='])
cfg.set_option('internal option', True)
print (cfg.get(option = '--help', source_order = [('cli', 'return')]))
print (cfg.get(option = '-?', source_order = [('cli', 'return')]))
fname = cfg.get(option = '--conf-file', source_order = [('cli', 'return')])
if fname is not None:
cfg.add_file_source(source = 'explicit', file = fname)
def test_set_list_opt():
src = [
'# a comment',
'[empty group]',
'[second group]',
'some option = in second group',
'# another comment',
'[test group]',
'test list = $test list$',
'old 1',
'old 2',
'$test list$',
'# another group:',
'[dummy group]'
__set_list_in_INI_file (
src = src,
sink = sys.stdout,
group = 'test group',
option = 'test list',
value = list('123')
def test_set_opt():
src = [
'# a comment',
'[empty group]',
'# another comment',
'[second group]',
'some option = in second group',
'[trap group]',
'trap list = $trap list$',
'dummy 1',
'test option = a trap',
'dummy 2',
'$trap list$',
'[test group]',
'test option = for real (old)',
__set_opt_in_INI_file (
src = src,
sink = sys.stdout,
group = 'test group',
option = 'test option',
value = 'for real (new)'
def test_parse_ini_stream():
data = parse_INI_stream(stream = open(sys.argv[2], 'r', encoding = 'utf8'))
for key in data:
print(key, data[key])
_cfg = gmCfgData()
_cfg.add_file_source(source = 'prefs', file = sys.argv[2])
print(_cfg.get('preferences', 'login', [('prefs', 'return')]))
print(_cfg.get('preferences', 'most recently used praxis branch', [('prefs', 'return')]))
def parse_INI_stream(stream=None, encoding=None)
Parse an iterable for INI-style data.
Returns a dict by sections containing a dict of values per section.
Expand source code
def parse_INI_stream(stream=None, encoding=None): """Parse an iterable for INI-style data. Returns a dict by sections containing a dict of values per section. """ _log.debug('parsing INI-style data stream [%s] using [%s]', stream, encoding) if encoding is None: encoding = 'utf8' data = {} current_group = None current_option = None current_option_path = None inside_list = False line_idx = 0 for line in stream: if type(line) is bytes: line = line.decode(encoding) line = line.replace('\015', '').replace('\012', '').strip() line_idx += 1 if inside_list: if line == '$%s$' % current_option: # end of list inside_list = False continue data[current_option_path].append(line) continue # noise if line == '' or line.startswith('#') or line.startswith(';'): continue # group if line.startswith('['): if not line.endswith(']'): _log.error('group line does not end in "]", aborting') _log.error(line) raise ValueError('INI-stream parsing error') group = line.strip('[]').strip() if group == '': _log.error('group name is empty, aborting') _log.error(line) raise ValueError('INI-stream parsing error') current_group = group continue # option if current_group is None: _log.warning('option found before first group, ignoring') _log.error(line) continue name, remainder = regex.split(r'\s*[=:]\s*', line, maxsplit = 1) if name == '': _log.error('option name empty, aborting') _log.error(line) raise ValueError('INI-stream parsing error') if remainder.strip() == '': if ('=' not in line) and (':' not in line): _log.error('missing name/value separator (= or :), aborting') _log.error(line) raise ValueError('INI-stream parsing error') current_option = name current_option_path = '%s::%s' % (current_group, current_option) if current_option_path in data: _log.warning('duplicate option [%s]', current_option_path) value = remainder.split('#', 1)[0].strip() # start of list ? if value == '$%s$' % current_option: inside_list = True data[current_option_path] = [] continue data[current_option_path] = value if inside_list: _log.critical('unclosed list $%s$ detected at end of config stream [%s]', current_option, stream) raise SyntaxError('end of config stream but still in list') return data
def set_option_in_INI_file(filename=None, group=None, option=None, value=None, encoding='utf8')
Expand source code
def set_option_in_INI_file(filename=None, group=None, option=None, value=None, encoding='utf8'): _log.debug('setting option "%s" to "%s" in group [%s]', option, value, group) _log.debug('file: %s (%s)', filename, encoding) sink = tempfile.NamedTemporaryFile(suffix = '.cfg', delete = True) sink_name = sink.close() # close it so it gets deleted so we can safely open it again src = open(filename, mode = 'rt', encoding = encoding) sink = open(sink_name, mode = 'wt', encoding = encoding) # is value a list ? if isinstance(value, type([])): __set_list_in_INI_file(src, sink, group, option, value) else: __set_opt_in_INI_file(src, sink, group, option, value) sink.close() src.close() shutil.copy2(sink_name, filename)
class gmCfgData
A generic Borg mixin for new-style classes.
mixin this class with your class' ancestors to borg it
there may be many instances of this - PER CHILD CLASS - but they all share state
Expand source code
class gmCfgData(gmBorg.cBorg): def __init__(self): try: self.__cfg_data except AttributeError: self.__cfg_data = {} self.source_files = {} #-------------------------------------------------- def get(self, group=None, option=None, source_order=None): """Get the value of a configuration option in a config file. <source_order> the order in which config files are searched a list of tuples (source, policy) policy: return: return only this value immediately append: append to list of potential values to return extend: if the value per source happens to be a list extend (rather than append to) the result list returns NONE when there's no value for an option """ if source_order is None: source_order = [('internal', 'return')] results = [] for source, policy in source_order: _log.debug('searching "%s" in [%s] in %s', option, group, source) if group is None: group = source option_path = '%s::%s' % (group, option) try: source_data = self.__cfg_data[source] except KeyError: _log.error('invalid config source [%s]', source) _log.debug('currently known sources: %s', list(self.__cfg_data)) continue try: value = source_data[option_path] except KeyError: _log.debug('option [%s] not in group [%s] in source [%s]', option, group, source) continue _log.debug('option [%s] found in source [%s]', option_path, source) if policy == 'return': return value if policy == 'extend': if isinstance(value, type([])): results.extend(value) else: results.append(value) else: results.append(value) if len(results) == 0: return None return results #-------------------------------------------------- def set_option(self, option=None, value=None, group=None, source=None): """Set a particular option to a particular value. Note that this does NOT PERSIST the option anywhere ! """ if None in [option, value]: raise ValueError('neither <option> nor <value> can be None') if source is None: source = 'internal' try: self.__cfg_data[source] except KeyError: self.__cfg_data[source] = {} if group is None: group = source option_path = '%s::%s' % (group, option) self.__cfg_data[source][option_path] = value #-------------------------------------------------- # API: source related #-------------------------------------------------- def add_stream_source(self, source=None, stream=None, encoding=None): data = parse_INI_stream(stream = stream, encoding = encoding) if source in self.__cfg_data: _log.warning('overriding source <%s> with [%s]', source, stream) self.__cfg_data[source] = data #-------------------------------------------------- def add_file_source(self, source=None, file=None, encoding='utf8'): """Add a source (a file) to the instance."""'file source "%s": %s (%s)', source, file, encoding) for existing_source, existing_file in self.source_files.items(): if existing_file == file: if source != existing_source: _log.warning('file [%s] already known as source [%s]', file, existing_source) _log.warning('adding it as source [%s] may provoke trouble', source) cfg_file = None if file is not None: try: cfg_file = open(file, mode = 'rt', encoding = encoding) except IOError: _log.error('cannot open [%s], keeping as dummy source', file) if cfg_file is None: file = None if source in self.__cfg_data: _log.warning('overriding source <%s> with dummy', source) self.__cfg_data[source] = {} else: self.add_stream_source(source = source, stream = cfg_file) cfg_file.close() self.source_files[source] = file #-------------------------------------------------- def remove_source(self, source): """Remove a source from the instance."""'removing source <%s>', source) try: del self.__cfg_data[source] except KeyError: _log.warning("source <%s> doesn't exist", source) try: del self.source_files[source] except KeyError: pass #-------------------------------------------------- def reload_file_source(self, file=None, encoding='utf8'): if file not in self.source_files.values(): return for src, fname in self.source_files.items(): if fname == file: self.add_file_source(source = src, file = fname, encoding = encoding) # don't break the loop because there could be other sources # with the same file (not very reasonable, I know) #break #-------------------------------------------------- def add_cli(self, short_options='', long_options=None): """Add command line parameters to config data. short: string containing one-letter options such as u'h?' for -h -? long: list of strings 'conf-file=' -> --conf-file=<...> 'debug' -> --debug """'adding command line arguments') _log.debug('raw command line is:') _log.debug('%s', sys.argv) import getopt if long_options is None: long_options = [] opts, remainder = getopt.gnu_getopt ( sys.argv[1:], short_options, long_options ) data = {} for opt, val in opts: if val == '': data['%s::%s' % ('cli', opt)] = True else: data['%s::%s' % ('cli', opt)] = val self.__cfg_data['cli'] = data
def add_cli(self, short_options='', long_options=None)
Add command line parameters to config data.
short: string containing one-letter options such as u'h?' for -h -? long: list of strings 'conf-file=' -> –conf-file=<…> 'debug' -> –debug
Expand source code
def add_cli(self, short_options='', long_options=None): """Add command line parameters to config data. short: string containing one-letter options such as u'h?' for -h -? long: list of strings 'conf-file=' -> --conf-file=<...> 'debug' -> --debug """'adding command line arguments') _log.debug('raw command line is:') _log.debug('%s', sys.argv) import getopt if long_options is None: long_options = [] opts, remainder = getopt.gnu_getopt ( sys.argv[1:], short_options, long_options ) data = {} for opt, val in opts: if val == '': data['%s::%s' % ('cli', opt)] = True else: data['%s::%s' % ('cli', opt)] = val self.__cfg_data['cli'] = data
def add_file_source(self, source=None, file=None, encoding='utf8')
Add a source (a file) to the instance.
Expand source code
def add_file_source(self, source=None, file=None, encoding='utf8'): """Add a source (a file) to the instance."""'file source "%s": %s (%s)', source, file, encoding) for existing_source, existing_file in self.source_files.items(): if existing_file == file: if source != existing_source: _log.warning('file [%s] already known as source [%s]', file, existing_source) _log.warning('adding it as source [%s] may provoke trouble', source) cfg_file = None if file is not None: try: cfg_file = open(file, mode = 'rt', encoding = encoding) except IOError: _log.error('cannot open [%s], keeping as dummy source', file) if cfg_file is None: file = None if source in self.__cfg_data: _log.warning('overriding source <%s> with dummy', source) self.__cfg_data[source] = {} else: self.add_stream_source(source = source, stream = cfg_file) cfg_file.close() self.source_files[source] = file
def add_stream_source(self, source=None, stream=None, encoding=None)
Expand source code
def add_stream_source(self, source=None, stream=None, encoding=None): data = parse_INI_stream(stream = stream, encoding = encoding) if source in self.__cfg_data: _log.warning('overriding source <%s> with [%s]', source, stream) self.__cfg_data[source] = data
def get(self, group=None, option=None, source_order=None)
Get the value of a configuration option in a config file.
the order in which config files are searched a list of tuples (source, policy) policy: return: return only this value immediately append: append to list of potential values to return extend: if the value per source happens to be a list extend (rather than append to) the result list returns NONE when there's no value for an option
Expand source code
def get(self, group=None, option=None, source_order=None): """Get the value of a configuration option in a config file. <source_order> the order in which config files are searched a list of tuples (source, policy) policy: return: return only this value immediately append: append to list of potential values to return extend: if the value per source happens to be a list extend (rather than append to) the result list returns NONE when there's no value for an option """ if source_order is None: source_order = [('internal', 'return')] results = [] for source, policy in source_order: _log.debug('searching "%s" in [%s] in %s', option, group, source) if group is None: group = source option_path = '%s::%s' % (group, option) try: source_data = self.__cfg_data[source] except KeyError: _log.error('invalid config source [%s]', source) _log.debug('currently known sources: %s', list(self.__cfg_data)) continue try: value = source_data[option_path] except KeyError: _log.debug('option [%s] not in group [%s] in source [%s]', option, group, source) continue _log.debug('option [%s] found in source [%s]', option_path, source) if policy == 'return': return value if policy == 'extend': if isinstance(value, type([])): results.extend(value) else: results.append(value) else: results.append(value) if len(results) == 0: return None return results
def reload_file_source(self, file=None, encoding='utf8')
Expand source code
def reload_file_source(self, file=None, encoding='utf8'): if file not in self.source_files.values(): return for src, fname in self.source_files.items(): if fname == file: self.add_file_source(source = src, file = fname, encoding = encoding) # don't break the loop because there could be other sources # with the same file (not very reasonable, I know) #break
def remove_source(self, source)
Remove a source from the instance.
Expand source code
def remove_source(self, source): """Remove a source from the instance."""'removing source <%s>', source) try: del self.__cfg_data[source] except KeyError: _log.warning("source <%s> doesn't exist", source) try: del self.source_files[source] except KeyError: pass
def set_option(self, option=None, value=None, group=None, source=None)
Set a particular option to a particular value.
Note that this does NOT PERSIST the option anywhere !
Expand source code
def set_option(self, option=None, value=None, group=None, source=None): """Set a particular option to a particular value. Note that this does NOT PERSIST the option anywhere ! """ if None in [option, value]: raise ValueError('neither <option> nor <value> can be None') if source is None: source = 'internal' try: self.__cfg_data[source] except KeyError: self.__cfg_data[source] = {} if group is None: group = source option_path = '%s::%s' % (group, option) self.__cfg_data[source][option_path] = value