Module Gnumed.pycommon.gmSerialTools
GNUmed serial port tools.
These functions are complementing pySerial.
Expand source code
"""GNUmed serial port tools.
These functions are complementing pySerial.
"""
#===========================================================================
__version__ = "$Revision: 1.3 $"
__author__ = "K.Hilbert <Karsten.Hilbert@gmx.net>"
__licence__ = "GPL v2 or later (details at https://www.gnu.org)"
import time, logging
_log = logging.getLogger('gm.serial')
_log.info(__version__)
#----------------------------------------------------
# general utility
#----------------------------------------------------
def wait_for_str(aDrv = None, aString = '', aTimeout = 2500, max_bytes = 2048):
"""Wait for a particular string with timeout.
- timeout in milliseconds, please
"""
if aString == '':
return (1, '')
if aDrv is None:
_log.error("need source for incoming data")
return (0, '')
if max_bytes < len(aString):
max_bytes = len(aString) + 1
rxd = ''
loop = 0
slice = 100
# how many loops ?
max_loops = abs(aTimeout / slice)
# wait for data
while loop < max_loops:
loop += 1
# something there
if aDrv.inWaiting() > 0:
# get all there is
while aDrv.inWaiting() > 0:
rxd = rxd + aDrv.read(size = 1)
# did this contain our expected string already ?
if rxd.find(aString) > -1:
return (1, rxd)
# did we exceed our character buffer limit ?
# this stops runaway serial ports
if len(rxd) >= max_bytes:
_log.error('exceeded maximum # of bytes (%s) to receive' % max_bytes)
return (0, rxd)
# nothing there, wait a slice
else:
if len(rxd) >= max_bytes:
_log.error('exceeded maximum # of bytes to receive')
return (0, rxd)
time.sleep(float(slice) / 1000)
# hm, waited for aTimeout but expected string not received
_log.warning('wait for [%s] timed out after %s ms', aString, aTimeout)
_log.debug(rxd)
return (0, rxd)
#--------------------------------------------------------
def wait_for_data(aDrv = None, aTimeout = 2500):
"""Wait for any incoming with timeout.
- timeout in milliseconds, please
"""
if aDrv is None:
_log.error("Need source for incoming data !")
return 0
loop = 0
slice = 100
# how many loops ?
max_loops = abs(aTimeout / slice)
# wait for data
while loop < max_loops:
# nothing there, wait a slice
if aDrv.inWaiting() == 0:
loop += 1
time.sleep(float(slice) / 1000)
else:
return 1
# hm, waited for aTimeout but expected string not received
_log.warning('Timed out after %s ms while waiting for data.' % aTimeout)
return 0
Functions
def wait_for_data(aDrv=None, aTimeout=2500)
-
Wait for any incoming with timeout.
- timeout in milliseconds, please
Expand source code
def wait_for_data(aDrv = None, aTimeout = 2500): """Wait for any incoming with timeout. - timeout in milliseconds, please """ if aDrv is None: _log.error("Need source for incoming data !") return 0 loop = 0 slice = 100 # how many loops ? max_loops = abs(aTimeout / slice) # wait for data while loop < max_loops: # nothing there, wait a slice if aDrv.inWaiting() == 0: loop += 1 time.sleep(float(slice) / 1000) else: return 1 # hm, waited for aTimeout but expected string not received _log.warning('Timed out after %s ms while waiting for data.' % aTimeout) return 0
def wait_for_str(aDrv=None, aString='', aTimeout=2500, max_bytes=2048)
-
Wait for a particular string with timeout.
- timeout in milliseconds, please
Expand source code
def wait_for_str(aDrv = None, aString = '', aTimeout = 2500, max_bytes = 2048): """Wait for a particular string with timeout. - timeout in milliseconds, please """ if aString == '': return (1, '') if aDrv is None: _log.error("need source for incoming data") return (0, '') if max_bytes < len(aString): max_bytes = len(aString) + 1 rxd = '' loop = 0 slice = 100 # how many loops ? max_loops = abs(aTimeout / slice) # wait for data while loop < max_loops: loop += 1 # something there if aDrv.inWaiting() > 0: # get all there is while aDrv.inWaiting() > 0: rxd = rxd + aDrv.read(size = 1) # did this contain our expected string already ? if rxd.find(aString) > -1: return (1, rxd) # did we exceed our character buffer limit ? # this stops runaway serial ports if len(rxd) >= max_bytes: _log.error('exceeded maximum # of bytes (%s) to receive' % max_bytes) return (0, rxd) # nothing there, wait a slice else: if len(rxd) >= max_bytes: _log.error('exceeded maximum # of bytes to receive') return (0, rxd) time.sleep(float(slice) / 1000) # hm, waited for aTimeout but expected string not received _log.warning('wait for [%s] timed out after %s ms', aString, aTimeout) _log.debug(rxd) return (0, rxd)