Module Gnumed.pycommon.gmWorkerThread
GNUmed worker threads.
wx.CallAfter() does not seem to work with multiprocessing !
Expand source code
"""GNUmed worker threads.
wx.CallAfter() does not seem to work with _multiprocessing_ !
"""
#=====================================================================
__author__ = "K.Hilbert <karsten.hilbert@gmx.net>"
__license__ = "GPL v2 or later"
import sys
import logging
import threading
import datetime as dt
import pickle
import copy
if __name__ == '__main__':
sys.path.insert(0, '../../')
_log = logging.getLogger('gm.worker')
#=====================================================================
def execute_in_worker_thread(payload_function=None, payload_kwargs:dict=None, completion_callback=None, worker_name:str=None) -> int:
"""Create a thread and have it execute "payload_function".
Args:
payload_function: function to actually run in the thread
payload_kwargs: keyword arguments to pass to "payload_function"
completion_callback: must be able to consume the results of "payload_function" unless "None"
worker_name: optional worker thread name
Returns:
ID of worker thread
"""
assert (callable(payload_function)), 'payload function <%s> is not callable' % payload_function
assert ((completion_callback is None) or callable(completion_callback)), 'completion callback <%s> is not callable' % completion_callback
_log.debug('worker [%s]', worker_name)
# try to decouple from calling thread
try:
__payload_kwargs = copy.deepcopy(payload_kwargs)
except (copy.error, pickle.PickleError):
_log.exception('failed to copy.deepcopy(payload_kwargs): %s', payload_kwargs)
_log.error('using shallow copy and hoping for the best')
__payload_kwargs = copy.copy(payload_kwargs)
worker_thread = None
#-------------------------------
def _run_payload():
"""Execute the payload function.
Defined inline so it can locally access arguments and the completion callback.
"""
try:
if payload_kwargs is None:
payload_result = payload_function()
else:
payload_result = payload_function(**__payload_kwargs)
_log.debug('finished running payload function: %s', payload_function)
except Exception:
_log.exception('error running payload function: %s', payload_function)
return
if completion_callback is None:
return
try:
completion_callback(payload_result)
_log.debug('finished running completion callback')
except Exception:
_log.exception('error running completion callback: %s', completion_callback)
_log.info('worker thread [name=%s, PID=%s] shuts down', worker_thread.name, worker_thread.ident)
return
#-------------------------------
if worker_name is None:
__thread_name = dt.datetime.now().strftime('%f-%S')
else:
__thread_name = '%sThread-%s' % (
worker_name,
dt.datetime.now().strftime('%f')
)
_log.debug('creating thread "%s"', __thread_name)
_log.debug(' "%s" payload function: %s', __thread_name, payload_function)
_log.debug(' "%s" results callback: %s', __thread_name, completion_callback)
worker_thread = threading.Thread (
target = _run_payload,
name = __thread_name
)
# we don't want hung workers to prevent us from exiting GNUmed
worker_thread.daemon = True
_log.info('starting thread "%s"', __thread_name)
worker_thread.start()
_log.debug(' "%s" ident (= PID): %s', worker_thread.name, worker_thread.ident)
# from here on, another thread executes _run_payload()
# which executes payload_function() and, eventually,
# completion_callback() if available,
# return thread ident so people can join() it if needed
return worker_thread.ident
#=====================================================================
# main
#=====================================================================
if __name__ == "__main__":
if len(sys.argv) < 2:
sys.exit()
if sys.argv[1] != 'test':
sys.exit()
import time
import random
def test_print_dots(ident=None):
"""Tests executing a function in a worker thread.
The thread slowly prints dots to stdout.
"""
def slowly_print_dots(info=None):
"""This slowly prints dots.
:param str info: some identifier
To be run in each thread."""
for idx in range(5):
print('* (#%s in %s)' % (idx, info))
time.sleep(1 + (random.random()*4))
return '%s' % time.localtime()
def print_dot_end_time(end_time):
"""Print the time printing dots ended.
:param str end_time: end time to print
Used as completion callback."""
print('done: %s' % end_time)
execute_in_worker_thread (
payload_function = slowly_print_dots,
payload_kwargs = {'info': ident},
completion_callback = print_dot_end_time
)
test_print_dots('A')
test_print_dots('B')
Functions
def execute_in_worker_thread(payload_function=None, payload_kwargs: dict = None, completion_callback=None, worker_name: str = None) ‑> int
-
Create a thread and have it execute "payload_function".
Args
payload_function
- function to actually run in the thread
payload_kwargs
- keyword arguments to pass to "payload_function"
completion_callback
- must be able to consume the results of "payload_function" unless "None"
worker_name
- optional worker thread name
Returns
ID of worker thread
Expand source code
def execute_in_worker_thread(payload_function=None, payload_kwargs:dict=None, completion_callback=None, worker_name:str=None) -> int: """Create a thread and have it execute "payload_function". Args: payload_function: function to actually run in the thread payload_kwargs: keyword arguments to pass to "payload_function" completion_callback: must be able to consume the results of "payload_function" unless "None" worker_name: optional worker thread name Returns: ID of worker thread """ assert (callable(payload_function)), 'payload function <%s> is not callable' % payload_function assert ((completion_callback is None) or callable(completion_callback)), 'completion callback <%s> is not callable' % completion_callback _log.debug('worker [%s]', worker_name) # try to decouple from calling thread try: __payload_kwargs = copy.deepcopy(payload_kwargs) except (copy.error, pickle.PickleError): _log.exception('failed to copy.deepcopy(payload_kwargs): %s', payload_kwargs) _log.error('using shallow copy and hoping for the best') __payload_kwargs = copy.copy(payload_kwargs) worker_thread = None #------------------------------- def _run_payload(): """Execute the payload function. Defined inline so it can locally access arguments and the completion callback. """ try: if payload_kwargs is None: payload_result = payload_function() else: payload_result = payload_function(**__payload_kwargs) _log.debug('finished running payload function: %s', payload_function) except Exception: _log.exception('error running payload function: %s', payload_function) return if completion_callback is None: return try: completion_callback(payload_result) _log.debug('finished running completion callback') except Exception: _log.exception('error running completion callback: %s', completion_callback) _log.info('worker thread [name=%s, PID=%s] shuts down', worker_thread.name, worker_thread.ident) return #------------------------------- if worker_name is None: __thread_name = dt.datetime.now().strftime('%f-%S') else: __thread_name = '%sThread-%s' % ( worker_name, dt.datetime.now().strftime('%f') ) _log.debug('creating thread "%s"', __thread_name) _log.debug(' "%s" payload function: %s', __thread_name, payload_function) _log.debug(' "%s" results callback: %s', __thread_name, completion_callback) worker_thread = threading.Thread ( target = _run_payload, name = __thread_name ) # we don't want hung workers to prevent us from exiting GNUmed worker_thread.daemon = True _log.info('starting thread "%s"', __thread_name) worker_thread.start() _log.debug(' "%s" ident (= PID): %s', worker_thread.name, worker_thread.ident) # from here on, another thread executes _run_payload() # which executes payload_function() and, eventually, # completion_callback() if available, # return thread ident so people can join() it if needed return worker_thread.ident