Module Gnumed.pycommon.gmNetworkTools
GNUmed internetworking tools.
Expand source code
# -*- coding: utf-8 -*-
"""GNUmed internetworking tools."""
#===========================================================================
__author__ = "K. Hilbert <Karsten.Hilbert@gmx.net>"
__license__ = "GPL v2 or later (details at https://www.gnu.org)"
# std libs
import sys
import time
import os.path
import logging
import urllib.request
import urllib.error
import zipfile
import webbrowser
# GNUmed libs
if __name__ == '__main__':
sys.path.insert(0, '../../')
_ = lambda x:x
from Gnumed.pycommon import gmLog2
from Gnumed.pycommon import gmTools
from Gnumed.pycommon import gmShellAPI
from Gnumed.pycommon import gmCfgINI
from Gnumed.pycommon import gmMimeLib
_log = logging.getLogger('gm.net')
#===========================================================================
# browser access
#---------------------------------------------------------------------------
def open_urls_in_browser(urls:list[str]=None, new:int=2, autoraise:bool=True, initial_delay:int=200, *args, **kwargs) -> bool:
"""Open URLs in a browser.
Args:
urls: URLs to open
new: whether to open a new browser, a new tab in a running browser, or a tab OR a browser (see Python docs), 2 = open new tab if possible
initial_delay: milliseconds to sleep after opening first URL (so that the browser may start up)
"""
if not urls:
return True
# can't timewarp just yet
duration2sleep = max(initial_delay, 0) / 1000
for url in urls:
open_url_in_browser(url = url, new = new, autoraise = autoraise)
time.sleep(duration2sleep)
duration2sleep = 0
return True
#---------------------------------------------------------------------------
def open_url_in_browser(url:str=None, new:int=2, autoraise:bool=True) -> bool:
"""Open an URL in a browser.
Args:
url: URL to open
new: whether to open a new browser, a new tab in a running browser, or a tab OR a browser, 2 = open new tab if possible
"""
if not url:
return True
try:
webbrowser.open(url, new = new, autoraise = autoraise)
except (webbrowser.Error, OSError, UnicodeEncodeError):
_log.exception('error calling browser with url=[%s]', url)
return False
return True
#---------------------------------------------------------------------------
def download_file(url, filename=None, suffix=None):
if filename is None:
filename = gmTools.get_unique_filename(prefix = 'gm-dl-', suffix = suffix)
_log.debug('downloading [%s] into [%s]', url, filename)
try:
dl_name, headers = urllib.request.urlretrieve(url, filename)
except (ValueError, OSError, IOError):
_log.exception('cannot download from [%s]', url)
gmLog2.log_stack_trace()
return None
_log.debug('%s' % headers)
return dl_name
#---------------------------------------------------------------------------
def mirror_url(url:str, base_dir:str=None, verbose:bool=False) -> str:
"""Mirror the web*page* at _url_, non-recursively.
Note: Not for mirroring a *site* (recursively).
Args:
url: the URL to mirror
base_dir: where to store the page and its prerequisites, sandbox dir under tmp_dir if None
"""
assert (url is not None), '<url> must not be None'
_log.debug('mirroring: %s', url)
if base_dir is None:
prefix = url.split('://')[-1]
prefix = prefix.strip(':').strip('/').replace('/', '#')
prefix = gmTools.fname_sanitize(prefix)
base_dir = gmTools.mk_sandbox_dir(prefix = prefix + '-')
_log.debug('base dir: %s', base_dir)
wget_cmd = [
'wget',
'--directory-prefix=%s' % base_dir,
#'--adjust-extension',
'--no-remove-listing',
'--timestamping',
'--page-requisites',
'--continue',
'--convert-links',
'--user-agent=""',
'--execute', 'robots=off',
'--wait=1'
]
if verbose:
wget_cmd.append('--debug')
wget_cmd.append(url)
#wget --output-file=logfile
#'<a href="%s">%s</a>' % (url, url),
success, ret_code, STDOUT = gmShellAPI.run_process(cmd_line = wget_cmd, timeout = 15, verbose = verbose)
if success:
return base_dir
return None
#===========================================================================
# data pack handling
#---------------------------------------------------------------------------
def download_data_packs_list(url, filename=None):
return download_file(url, filename = filename, suffix = 'conf')
#---------------------------------------------------------------------------
def download_data_pack(pack_url, filename=None, md5_url=None):
_log.debug('downloading data pack from: %s', pack_url)
dp_fname = download_file(pack_url, filename = filename, suffix = 'zip')
_log.debug('downloading MD5 from: %s', md5_url)
md5_fname = download_file(md5_url, filename = dp_fname + '.md5')
md5_file = open(md5_fname, mode = 'rt', encoding = 'utf-8-sig')
md5_expected = md5_file.readline().strip('\n')
md5_file.close()
_log.debug('expected MD5: %s', md5_expected)
md5_calculated = gmTools.file2md5(dp_fname, return_hex = True)
_log.debug('calculated MD5: %s', md5_calculated)
if md5_calculated != md5_expected:
_log.error('mismatch of expected vs calculated MD5: [%s] vs [%s]', md5_expected, md5_calculated)
return (False, (md5_expected, md5_calculated))
return True, dp_fname
#---------------------------------------------------------------------------
def unzip_data_pack(filename=None):
unzip_dir = os.path.splitext(filename)[0]
_log.debug('unzipping data pack into [%s]', unzip_dir)
gmTools.mkdir(unzip_dir)
try:
data_pack = zipfile.ZipFile(filename, 'r')
except (zipfile.BadZipfile):
_log.exception('cannot unzip data pack [%s]', filename)
gmLog2.log_stack_trace()
return None
data_pack.extractall(unzip_dir)
return unzip_dir
#---------------------------------------------------------------------------
def install_data_pack(data_pack=None, conn=None):
from Gnumed.pycommon import gmPsql
psql = gmPsql.Psql(conn)
sql_script = os.path.join(data_pack['unzip_dir'], 'install-data-pack.sql')
if psql.run(sql_script) == 0:
curs = conn.cursor()
curs.execute('select gm.log_script_insertion(%(name)s, %(ver)s)', {'name': data_pack['pack_url'], 'ver': 'current'})
curs.close()
conn.commit()
return True
_log.error('error installing data pack: %s', data_pack)
return False
#===========================================================================
# client update handling
#---------------------------------------------------------------------------
def compare_versions(left_version, right_version):
"""
0: left == right
-1: left < right
1: left > right
"""
_log.debug('comparing [%s] with [%s]', left_version, right_version)
if left_version == right_version:
_log.debug('same version')
return 0
if right_version in ['head', 'dev', 'devel']:
_log.debug('development code')
return -1
if left_version in ['head', 'dev', 'devel']:
_log.debug('development code')
return 1
left_parts = left_version.split('.')
right_parts = right_version.split('.')
tmp, left_major = gmTools.input2decimal('%s.%s' % (left_parts[0], left_parts[1]))
tmp, right_major = gmTools.input2decimal('%s.%s' % (right_parts[0], right_parts[1]))
if left_major < right_major:
_log.debug('left version [%s] < right version [%s]: major part', left_version, right_version)
return -1
if left_major > right_major:
_log.debug('left version [%s] > right version [%s]: major part', left_version, right_version)
return 1
tmp, left_part3 = gmTools.input2decimal(left_parts[2].replace('rc', '0.'))
tmp, right_part3 = gmTools.input2decimal(right_parts[2].replace('rc', '0.'))
if left_part3 < right_part3:
_log.debug('left version [%s] < right version [%s]: minor part', left_version, right_version)
return -1
if left_part3 > right_part3:
_log.debug('left version [%s] > right version [%s]: minor part', left_version, right_version)
return 1
return 0
#---------------------------------------------------------------------------
def check_for_update(url=None, current_branch=None, current_version=None, consider_latest_branch=False):
"""Check for new releases at <url>.
Returns (bool, text).
True: new release available
False: up to date
None: don't know
"""
if current_version is None:
_log.debug('<current_version> is None, currency unknown')
return (None, None)
if current_version.casefold() in ['git head', 'head', 'tip', 'dev', 'devel']:
_log.debug('[%s] always considered up to date', current_version)
return (False, None)
try:
remote_file = urllib.request.urlopen(url)
except (urllib.error.URLError, ValueError, OSError, IOError):
# IOError: socket.error
_log.exception("cannot retrieve version file from [%s]", url)
return (None, _('Cannot retrieve version information from:\n\n%s') % url)
_log.debug('retrieving version information from [%s]', url)
cfg = gmCfgINI.gmCfgData()
try:
#remote_file.read().decode(resource.headers.get_content_charset())
cfg.add_stream_source(source = 'gm-versions', stream = remote_file, encoding = u'utf8')
except (UnicodeDecodeError):
remote_file.close()
_log.exception("cannot read version file from [%s]", url)
return (None, _('Cannot read version information from:\n\n%s') % url)
remote_file.close()
latest_branch = cfg.get('latest branch', 'branch', source_order = [('gm-versions', 'return')])
latest_release_on_latest_branch = cfg.get('branch %s' % latest_branch, 'latest release', source_order = [('gm-versions', 'return')])
latest_release_on_current_branch = cfg.get('branch %s' % current_branch, 'latest release', source_order = [('gm-versions', 'return')])
cfg.remove_source('gm-versions')
_log.info('current release: %s', current_version)
_log.info('current branch: %s', current_branch)
_log.info('latest release on current branch: %s', latest_release_on_current_branch)
_log.info('latest branch: %s', latest_branch)
_log.info('latest release on latest branch: %s', latest_release_on_latest_branch)
# anything known ?
no_release_information_available = (
(
(latest_release_on_current_branch is None) and
(latest_release_on_latest_branch is None)
) or (
not consider_latest_branch and
(latest_release_on_current_branch is None)
)
)
if no_release_information_available:
_log.warning('no release information available')
msg = _('There is no version information available from:\n\n%s') % url
return (None, msg)
# up to date ?
if consider_latest_branch:
_log.debug('latest branch taken into account')
if latest_release_on_latest_branch is None:
if compare_versions(latest_release_on_current_branch, current_version) in [-1, 0]:
_log.debug('up to date: current version >= latest version on current branch and no latest branch available')
return (False, None)
else:
if compare_versions(latest_release_on_latest_branch, current_version) in [-1, 0]:
_log.debug('up to date: current version >= latest version on latest branch')
return (False, None)
else:
_log.debug('latest branch not taken into account')
if compare_versions(latest_release_on_current_branch, current_version) in [-1, 0]:
_log.debug('up to date: current version >= latest version on current branch')
return (False, None)
new_release_on_current_branch_available = (
(latest_release_on_current_branch is not None) and
(compare_versions(latest_release_on_current_branch, current_version) == 1)
)
_log.info('%snew release on current branch available', gmTools.bool2str(new_release_on_current_branch_available, '', 'no '))
new_release_on_latest_branch_available = (
(latest_branch is not None)
and
(
(latest_branch > current_branch) or (
(latest_branch == current_branch) and
(compare_versions(latest_release_on_latest_branch, current_version) == 1)
)
)
)
_log.info('%snew release on latest branch available', gmTools.bool2str(new_release_on_latest_branch_available, '', 'no '))
if not (new_release_on_current_branch_available or new_release_on_latest_branch_available):
_log.debug('up to date: no new releases available')
return (False, None)
# not up to date
msg = _('A new version of GNUmed is available.\n\n')
msg += _(' Your current version: "%s"\n') % current_version
if consider_latest_branch:
if new_release_on_current_branch_available:
msg += '\n'
msg += _(' New version: "%s"') % latest_release_on_current_branch
msg += '\n'
msg += _(' - bug fixes only\n')
msg += _(' - database fixups may be needed\n')
if new_release_on_latest_branch_available:
if current_branch != latest_branch:
msg += '\n'
msg += _(' New version: "%s"') % latest_release_on_latest_branch
msg += '\n'
msg += _(' - bug fixes and new features\n')
msg += _(' - database upgrade required\n')
else:
msg += '\n'
msg += _(' New version: "%s"') % latest_release_on_current_branch
msg += '\n'
msg += _(' - bug fixes only\n')
msg += _(' - database fixups may be needed\n')
msg += '\n\n'
msg += _(
'Note, however, that this version may not yet\n'
'be available *pre-packaged* for your system.'
)
msg += '\n\n'
msg += _('Details are found on <https://www.gnumed.de>.\n')
msg += '\n'
msg += _('Version information loaded from:\n\n %s') % url
return (True, msg)
#===========================================================================
# mail handling
#---------------------------------------------------------------------------
default_mail_sender = 'gnumed@gmx.net'
default_mail_receiver = 'gnumed-devel@gnu.org'
default_mail_server = 'mail.gmx.net'
#---------------------------------------------------------------------------
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.image import MIMEImage
from email.mime.audio import MIMEAudio
from email.mime.application import MIMEApplication
def compose_email(sender=None, receiver=None, message=None, subject=None, files2attach=None):
# filenames unicode
# file content binary or utf8
# files2attach = [(filename, mimetype-or-None), ...]
if message is None:
raise ValueError('<message> is None, cannot compose email')
message = message.lstrip().lstrip('\r\n').lstrip()
if sender is None:
sender = default_mail_sender
if receiver is None:
receiver = [default_mail_receiver]
if subject is None:
subject = 'compose_email() test'
if files2attach is None:
email = MIMEText(message, 'plain', 'utf8')
else:
email = MIMEMultipart()
email.attach(MIMEText(message, 'plain', 'utf8'))
email['From'] = sender
email['To'] = ', '.join(receiver)
email['Subject'] = subject
if files2attach is None:
return email
for file2attach in files2attach:
filename = file2attach[0]
try:
mimetype = file2attach[1]
except IndexError:
mimetype = gmMimeLib.guess_mimetype(filename = filename)
# text/*
if mimetype.startswith('text/'):
txt = open(filename, mode = 'rt', encoding = 'utf8')
attachment = MIMEText(txt.read(), 'plain', 'utf8')
txt.close()
# image/*
elif mimetype.startswith('image/'):
img = open(filename, mode = 'rb')
attachment = MIMEImage(img.read())
img.close()
# audio/*
elif mimetype.startswith('audio/'):
song = open(filename, mode = 'rb')
attachment = MIMEAudio(song.read())
song.close()
# catch-all application/*
else:
_log.debug('attaching [%s] with type [%s]', filename, mimetype)
mime_subtype = mimetype.split('/', 1)[1]
data = open(filename, mode = 'rb')
attachment = MIMEApplication(data.read(), mime_subtype)
data.close()
try:
attachment.replace_header('Content-Disposition', 'attachment; filename="%s"' % gmTools.fname_from_path(filename))
except KeyError:
attachment.add_header('Content-Disposition', 'attachment; filename="%s"' % gmTools.fname_from_path(filename))
email.attach(attachment)
return email
#---------------------------------------------------------------------------
def send_email(sender=None, receiver=None, email=None, server=None, auth=None, debug=False):
if email is None:
raise ValueError('<email> is None, cannot send')
if sender is None:
sender = default_mail_sender
if receiver is None:
receiver = [default_mail_receiver]
if server is None:
server = default_mail_server
import smtplib
failed = False
refused = []
try:
session = smtplib.SMTP(server)
session.set_debuglevel(debug)
try:
session.starttls()
except smtplib.SMTPException:
_log.error('cannot enable TLS on [%s]', server)
session.ehlo()
if auth is not None:
session.login(auth['user'], auth['password'])
refused = session.sendmail(sender, receiver, email.as_string())
session.quit()
except smtplib.SMTPException:
failed = True
_log.exception('cannot send email')
gmLog2.log_stack_trace()
if len(refused) > 0:
_log.error("refused recipients: %s" % refused)
if failed:
return False
return True
#---------------------------------------------------------------------------
def compose_and_send_email(sender=None, receiver=None, message=None, server=None, auth=None, debug=False, subject=None, attachments=None):
email = compose_email (
sender = sender,
receiver = receiver,
message = message,
subject = subject,
files2attach = attachments
)
return send_email (
sender = sender,
receiver = receiver,
email = email,
server = server,
auth = auth,
debug = debug
)
#===========================================================================
# main
#---------------------------------------------------------------------------
if __name__ == '__main__':
if len(sys.argv) < 2:
sys.exit()
if sys.argv[1] != 'test':
sys.exit()
#-----------------------------------------------------------------------
def test_compose_email():
email = compose_email (
message = 'compose_email() test: üü ßß',
files2attach = [[sys.argv[2]]]
)
print(email.as_string())
return email
#-----------------------------------------------------------------------
def test_send_email():
email = compose_email (
message = 'compose_email() test: üü ßß',
files2attach = [[sys.argv[2]]]
)
print(send_email (
# receiver = u'ncq@localhost',
email = email,
# server = 'localhost',
auth = {'user': default_mail_sender, 'password': 'gnumed-at-gmx-net'},
debug = True
))
#-----------------------------------------------------------------------
def test_check_for_update():
test_data = [
('https://www.gnumed.de/downloads/gnumed-versions.txt', None, None, False),
('file:///home/ncq/gm-versions.txt', None, None, False),
('file:///home/ncq/gm-versions.txt', '0.2', '0.2.8.1', False),
('file:///home/ncq/gm-versions.txt', '0.2', '0.2.8.1', True),
('file:///home/ncq/gm-versions.txt', '0.2', '0.2.8.5', True)
]
for test in test_data:
print("arguments:", test)
found, msg = check_for_update(test[0], test[1], test[2], test[3])
print(msg)
return
#-----------------------------------------------------------------------
def test_dl_data_pack():
#url = 'file:./x-data_pack.zip'
#url = 'missing-file.zip'
url = 'gmTools.py'
dl_name = download_data_pack(url)
print(url, "->", dl_name)
unzip_dir = unzip_data_pack(dl_name)
print("unzipped into", unzip_dir)
#-----------------------------------------------------------------------
def test_browser():
success = open_url_in_browser(sys.argv[2])
print(success)
open_url_in_browser(sys.argv[2])
#-----------------------------------------------------------------------
def test_mirror_url():
mirror_url(url = sys.argv[2])
#-----------------------------------------------------------------------
#test_check_for_update()
#test_compose_email()
#test_send_email()
#test_dl_data_pack()
test_browser()
#test_mirror_url()
#===========================================================================
Functions
def check_for_update(url=None, current_branch=None, current_version=None, consider_latest_branch=False)
-
Check for new releases at
. Returns (bool, text). True: new release available False: up to date None: don't know
Expand source code
def check_for_update(url=None, current_branch=None, current_version=None, consider_latest_branch=False): """Check for new releases at <url>. Returns (bool, text). True: new release available False: up to date None: don't know """ if current_version is None: _log.debug('<current_version> is None, currency unknown') return (None, None) if current_version.casefold() in ['git head', 'head', 'tip', 'dev', 'devel']: _log.debug('[%s] always considered up to date', current_version) return (False, None) try: remote_file = urllib.request.urlopen(url) except (urllib.error.URLError, ValueError, OSError, IOError): # IOError: socket.error _log.exception("cannot retrieve version file from [%s]", url) return (None, _('Cannot retrieve version information from:\n\n%s') % url) _log.debug('retrieving version information from [%s]', url) cfg = gmCfgINI.gmCfgData() try: #remote_file.read().decode(resource.headers.get_content_charset()) cfg.add_stream_source(source = 'gm-versions', stream = remote_file, encoding = u'utf8') except (UnicodeDecodeError): remote_file.close() _log.exception("cannot read version file from [%s]", url) return (None, _('Cannot read version information from:\n\n%s') % url) remote_file.close() latest_branch = cfg.get('latest branch', 'branch', source_order = [('gm-versions', 'return')]) latest_release_on_latest_branch = cfg.get('branch %s' % latest_branch, 'latest release', source_order = [('gm-versions', 'return')]) latest_release_on_current_branch = cfg.get('branch %s' % current_branch, 'latest release', source_order = [('gm-versions', 'return')]) cfg.remove_source('gm-versions') _log.info('current release: %s', current_version) _log.info('current branch: %s', current_branch) _log.info('latest release on current branch: %s', latest_release_on_current_branch) _log.info('latest branch: %s', latest_branch) _log.info('latest release on latest branch: %s', latest_release_on_latest_branch) # anything known ? no_release_information_available = ( ( (latest_release_on_current_branch is None) and (latest_release_on_latest_branch is None) ) or ( not consider_latest_branch and (latest_release_on_current_branch is None) ) ) if no_release_information_available: _log.warning('no release information available') msg = _('There is no version information available from:\n\n%s') % url return (None, msg) # up to date ? if consider_latest_branch: _log.debug('latest branch taken into account') if latest_release_on_latest_branch is None: if compare_versions(latest_release_on_current_branch, current_version) in [-1, 0]: _log.debug('up to date: current version >= latest version on current branch and no latest branch available') return (False, None) else: if compare_versions(latest_release_on_latest_branch, current_version) in [-1, 0]: _log.debug('up to date: current version >= latest version on latest branch') return (False, None) else: _log.debug('latest branch not taken into account') if compare_versions(latest_release_on_current_branch, current_version) in [-1, 0]: _log.debug('up to date: current version >= latest version on current branch') return (False, None) new_release_on_current_branch_available = ( (latest_release_on_current_branch is not None) and (compare_versions(latest_release_on_current_branch, current_version) == 1) ) _log.info('%snew release on current branch available', gmTools.bool2str(new_release_on_current_branch_available, '', 'no ')) new_release_on_latest_branch_available = ( (latest_branch is not None) and ( (latest_branch > current_branch) or ( (latest_branch == current_branch) and (compare_versions(latest_release_on_latest_branch, current_version) == 1) ) ) ) _log.info('%snew release on latest branch available', gmTools.bool2str(new_release_on_latest_branch_available, '', 'no ')) if not (new_release_on_current_branch_available or new_release_on_latest_branch_available): _log.debug('up to date: no new releases available') return (False, None) # not up to date msg = _('A new version of GNUmed is available.\n\n') msg += _(' Your current version: "%s"\n') % current_version if consider_latest_branch: if new_release_on_current_branch_available: msg += '\n' msg += _(' New version: "%s"') % latest_release_on_current_branch msg += '\n' msg += _(' - bug fixes only\n') msg += _(' - database fixups may be needed\n') if new_release_on_latest_branch_available: if current_branch != latest_branch: msg += '\n' msg += _(' New version: "%s"') % latest_release_on_latest_branch msg += '\n' msg += _(' - bug fixes and new features\n') msg += _(' - database upgrade required\n') else: msg += '\n' msg += _(' New version: "%s"') % latest_release_on_current_branch msg += '\n' msg += _(' - bug fixes only\n') msg += _(' - database fixups may be needed\n') msg += '\n\n' msg += _( 'Note, however, that this version may not yet\n' 'be available *pre-packaged* for your system.' ) msg += '\n\n' msg += _('Details are found on <https://www.gnumed.de>.\n') msg += '\n' msg += _('Version information loaded from:\n\n %s') % url return (True, msg)
def compare_versions(left_version, right_version)
-
0: left == right -1: left < right 1: left > right
Expand source code
def compare_versions(left_version, right_version): """ 0: left == right -1: left < right 1: left > right """ _log.debug('comparing [%s] with [%s]', left_version, right_version) if left_version == right_version: _log.debug('same version') return 0 if right_version in ['head', 'dev', 'devel']: _log.debug('development code') return -1 if left_version in ['head', 'dev', 'devel']: _log.debug('development code') return 1 left_parts = left_version.split('.') right_parts = right_version.split('.') tmp, left_major = gmTools.input2decimal('%s.%s' % (left_parts[0], left_parts[1])) tmp, right_major = gmTools.input2decimal('%s.%s' % (right_parts[0], right_parts[1])) if left_major < right_major: _log.debug('left version [%s] < right version [%s]: major part', left_version, right_version) return -1 if left_major > right_major: _log.debug('left version [%s] > right version [%s]: major part', left_version, right_version) return 1 tmp, left_part3 = gmTools.input2decimal(left_parts[2].replace('rc', '0.')) tmp, right_part3 = gmTools.input2decimal(right_parts[2].replace('rc', '0.')) if left_part3 < right_part3: _log.debug('left version [%s] < right version [%s]: minor part', left_version, right_version) return -1 if left_part3 > right_part3: _log.debug('left version [%s] > right version [%s]: minor part', left_version, right_version) return 1 return 0
def compose_and_send_email(sender=None, receiver=None, message=None, server=None, auth=None, debug=False, subject=None, attachments=None)
-
Expand source code
def compose_and_send_email(sender=None, receiver=None, message=None, server=None, auth=None, debug=False, subject=None, attachments=None): email = compose_email ( sender = sender, receiver = receiver, message = message, subject = subject, files2attach = attachments ) return send_email ( sender = sender, receiver = receiver, email = email, server = server, auth = auth, debug = debug )
def compose_email(sender=None, receiver=None, message=None, subject=None, files2attach=None)
-
Expand source code
def compose_email(sender=None, receiver=None, message=None, subject=None, files2attach=None): # filenames unicode # file content binary or utf8 # files2attach = [(filename, mimetype-or-None), ...] if message is None: raise ValueError('<message> is None, cannot compose email') message = message.lstrip().lstrip('\r\n').lstrip() if sender is None: sender = default_mail_sender if receiver is None: receiver = [default_mail_receiver] if subject is None: subject = 'compose_email() test' if files2attach is None: email = MIMEText(message, 'plain', 'utf8') else: email = MIMEMultipart() email.attach(MIMEText(message, 'plain', 'utf8')) email['From'] = sender email['To'] = ', '.join(receiver) email['Subject'] = subject if files2attach is None: return email for file2attach in files2attach: filename = file2attach[0] try: mimetype = file2attach[1] except IndexError: mimetype = gmMimeLib.guess_mimetype(filename = filename) # text/* if mimetype.startswith('text/'): txt = open(filename, mode = 'rt', encoding = 'utf8') attachment = MIMEText(txt.read(), 'plain', 'utf8') txt.close() # image/* elif mimetype.startswith('image/'): img = open(filename, mode = 'rb') attachment = MIMEImage(img.read()) img.close() # audio/* elif mimetype.startswith('audio/'): song = open(filename, mode = 'rb') attachment = MIMEAudio(song.read()) song.close() # catch-all application/* else: _log.debug('attaching [%s] with type [%s]', filename, mimetype) mime_subtype = mimetype.split('/', 1)[1] data = open(filename, mode = 'rb') attachment = MIMEApplication(data.read(), mime_subtype) data.close() try: attachment.replace_header('Content-Disposition', 'attachment; filename="%s"' % gmTools.fname_from_path(filename)) except KeyError: attachment.add_header('Content-Disposition', 'attachment; filename="%s"' % gmTools.fname_from_path(filename)) email.attach(attachment) return email
def download_data_pack(pack_url, filename=None, md5_url=None)
-
Expand source code
def download_data_pack(pack_url, filename=None, md5_url=None): _log.debug('downloading data pack from: %s', pack_url) dp_fname = download_file(pack_url, filename = filename, suffix = 'zip') _log.debug('downloading MD5 from: %s', md5_url) md5_fname = download_file(md5_url, filename = dp_fname + '.md5') md5_file = open(md5_fname, mode = 'rt', encoding = 'utf-8-sig') md5_expected = md5_file.readline().strip('\n') md5_file.close() _log.debug('expected MD5: %s', md5_expected) md5_calculated = gmTools.file2md5(dp_fname, return_hex = True) _log.debug('calculated MD5: %s', md5_calculated) if md5_calculated != md5_expected: _log.error('mismatch of expected vs calculated MD5: [%s] vs [%s]', md5_expected, md5_calculated) return (False, (md5_expected, md5_calculated)) return True, dp_fname
def download_data_packs_list(url, filename=None)
-
Expand source code
def download_data_packs_list(url, filename=None): return download_file(url, filename = filename, suffix = 'conf')
def download_file(url, filename=None, suffix=None)
-
Expand source code
def download_file(url, filename=None, suffix=None): if filename is None: filename = gmTools.get_unique_filename(prefix = 'gm-dl-', suffix = suffix) _log.debug('downloading [%s] into [%s]', url, filename) try: dl_name, headers = urllib.request.urlretrieve(url, filename) except (ValueError, OSError, IOError): _log.exception('cannot download from [%s]', url) gmLog2.log_stack_trace() return None _log.debug('%s' % headers) return dl_name
def install_data_pack(data_pack=None, conn=None)
-
Expand source code
def install_data_pack(data_pack=None, conn=None): from Gnumed.pycommon import gmPsql psql = gmPsql.Psql(conn) sql_script = os.path.join(data_pack['unzip_dir'], 'install-data-pack.sql') if psql.run(sql_script) == 0: curs = conn.cursor() curs.execute('select gm.log_script_insertion(%(name)s, %(ver)s)', {'name': data_pack['pack_url'], 'ver': 'current'}) curs.close() conn.commit() return True _log.error('error installing data pack: %s', data_pack) return False
def mirror_url(url: str, base_dir: str = None, verbose: bool = False) ‑> str
-
Mirror the webpage at url, non-recursively.
Note: Not for mirroring a site (recursively).
Args
url
- the URL to mirror
base_dir
- where to store the page and its prerequisites, sandbox dir under tmp_dir if None
Expand source code
def mirror_url(url:str, base_dir:str=None, verbose:bool=False) -> str: """Mirror the web*page* at _url_, non-recursively. Note: Not for mirroring a *site* (recursively). Args: url: the URL to mirror base_dir: where to store the page and its prerequisites, sandbox dir under tmp_dir if None """ assert (url is not None), '<url> must not be None' _log.debug('mirroring: %s', url) if base_dir is None: prefix = url.split('://')[-1] prefix = prefix.strip(':').strip('/').replace('/', '#') prefix = gmTools.fname_sanitize(prefix) base_dir = gmTools.mk_sandbox_dir(prefix = prefix + '-') _log.debug('base dir: %s', base_dir) wget_cmd = [ 'wget', '--directory-prefix=%s' % base_dir, #'--adjust-extension', '--no-remove-listing', '--timestamping', '--page-requisites', '--continue', '--convert-links', '--user-agent=""', '--execute', 'robots=off', '--wait=1' ] if verbose: wget_cmd.append('--debug') wget_cmd.append(url) #wget --output-file=logfile #'<a href="%s">%s</a>' % (url, url), success, ret_code, STDOUT = gmShellAPI.run_process(cmd_line = wget_cmd, timeout = 15, verbose = verbose) if success: return base_dir return None
def open_url_in_browser(url: str = None, new: int = 2, autoraise: bool = True) ‑> bool
-
Open an URL in a browser.
Args
url
- URL to open
new
- whether to open a new browser, a new tab in a running browser, or a tab OR a browser, 2 = open new tab if possible
Expand source code
def open_url_in_browser(url:str=None, new:int=2, autoraise:bool=True) -> bool: """Open an URL in a browser. Args: url: URL to open new: whether to open a new browser, a new tab in a running browser, or a tab OR a browser, 2 = open new tab if possible """ if not url: return True try: webbrowser.open(url, new = new, autoraise = autoraise) except (webbrowser.Error, OSError, UnicodeEncodeError): _log.exception('error calling browser with url=[%s]', url) return False return True
def open_urls_in_browser(urls: list[str] = None, new: int = 2, autoraise: bool = True, initial_delay: int = 200, *args, **kwargs) ‑> bool
-
Open URLs in a browser.
Args
urls
- URLs to open
new
- whether to open a new browser, a new tab in a running browser, or a tab OR a browser (see Python docs), 2 = open new tab if possible
initial_delay
- milliseconds to sleep after opening first URL (so that the browser may start up)
Expand source code
def open_urls_in_browser(urls:list[str]=None, new:int=2, autoraise:bool=True, initial_delay:int=200, *args, **kwargs) -> bool: """Open URLs in a browser. Args: urls: URLs to open new: whether to open a new browser, a new tab in a running browser, or a tab OR a browser (see Python docs), 2 = open new tab if possible initial_delay: milliseconds to sleep after opening first URL (so that the browser may start up) """ if not urls: return True # can't timewarp just yet duration2sleep = max(initial_delay, 0) / 1000 for url in urls: open_url_in_browser(url = url, new = new, autoraise = autoraise) time.sleep(duration2sleep) duration2sleep = 0 return True
def send_email(sender=None, receiver=None, email=None, server=None, auth=None, debug=False)
-
Expand source code
def send_email(sender=None, receiver=None, email=None, server=None, auth=None, debug=False): if email is None: raise ValueError('<email> is None, cannot send') if sender is None: sender = default_mail_sender if receiver is None: receiver = [default_mail_receiver] if server is None: server = default_mail_server import smtplib failed = False refused = [] try: session = smtplib.SMTP(server) session.set_debuglevel(debug) try: session.starttls() except smtplib.SMTPException: _log.error('cannot enable TLS on [%s]', server) session.ehlo() if auth is not None: session.login(auth['user'], auth['password']) refused = session.sendmail(sender, receiver, email.as_string()) session.quit() except smtplib.SMTPException: failed = True _log.exception('cannot send email') gmLog2.log_stack_trace() if len(refused) > 0: _log.error("refused recipients: %s" % refused) if failed: return False return True
def unzip_data_pack(filename=None)
-
Expand source code
def unzip_data_pack(filename=None): unzip_dir = os.path.splitext(filename)[0] _log.debug('unzipping data pack into [%s]', unzip_dir) gmTools.mkdir(unzip_dir) try: data_pack = zipfile.ZipFile(filename, 'r') except (zipfile.BadZipfile): _log.exception('cannot unzip data pack [%s]', filename) gmLog2.log_stack_trace() return None data_pack.extractall(unzip_dir) return unzip_dir