Progress indicationProvides thread safe printing of progress indication to stdout.File Name : spin.py Requirement : Python Author : Branimir Petrovic Submitted : 10/02/2003 Category : Python ############################################################ # FileName spin.py __author__ = "Branimir Petrovic" __version__ = "1.0.0" __date__ = "1 Feb 2003" ############################################################ import sys, threading, time class ProgressBase(threading.Thread): """Base class - not to be instanciated.""" def __init__(self): self.rlock = threading.RLock() self.cv = threading.Condition() threading.Thread.__init__(self) self.setDaemon(1) def __backStep(self): if self.inplace: sys.stdout.write('\b \b') def __call__(self): self.start() def start(self): self.stopFlag = 0 threading.Thread.start(self) def stop(self): """To be called by the 'main' thread: Method will block and wait for the thread to stop before returning control to 'main'.""" self.stopFlag = 1 # Wake up 'Sleeping Beauty' ahead of time (if it needs to)... self.cv.acquire() self.cv.notify() self.cv.release() # Block and wait here untill thread fully exits its run method. self.rlock.acquire() class Spinner(ProgressBase): """Print 'animated' /|\ sequence to stdout in separate thread""" def __init__(self, speed=0.1): self.__seq = [chr(47), "|", chr(92)] self.__speed = speed self.inplace = 1 ProgressBase.__init__(self) def run(self): self.rlock.acquire() self.cv.acquire() sys.stdout.write(' ') while 1: for char in self.__seq: self.cv.wait(self.__speed) # 'Sleeping Beauty' part if self.stopFlag: self._ProgressBase__backStep() try : return ### >>> finally : # release lock immediatley after returning self.rlock.release() if self.inplace: sys.stdout.write('\b') sys.stdout.write(char) class Dotter(ProgressBase): """Print 'animated' sequence of dots - one per sec.""" def __init__(self, speed=1): self.__seq = "." self.__speed = speed self.inplace = 0 ProgressBase.__init__(self) def run(self): self.cv.acquire() self.rlock.acquire() while 1: self.cv.wait(self.__speed) # 'Sleeping Beauty' part if self.stopFlag: self._ProgressBase__backStep() try : return ### >>> finally : # release lock immediatley after returning self.rlock.release() if self.inplace: sys.stdout.write('\b') sys.stdout.write(self.__seq) def spinIt(fnct, *args, **kwargs): """Displays spinner while the fnct executes""" indicator = Spinner(speed=0.1) indicator.start() # prints 'animated' /|\ sequence in place fnct(*args, **kwargs) indicator.stop() def dotItSlow(fnct, *args, **kwargs): """Displays progress dots (1 per sec) while the fnct executes""" indicator = Dotter(speed=1) indicator.start() # prints sequence of dots, 1 dot per second fnct(*args, **kwargs) indicator.stop() def dotItFast(fnct, *args, **kwargs): """Displays progress dots (5 per sec) while the fnct executes""" indicator = Dotter(speed=0.2) indicator.start() # prints sequence of dots, 5 dots per second fnct(*args, **kwargs) indicator.stop() if __name__=="__main__": poem = ['Mary had a little lamb,', 'Its fleece was white as snow.', 'And everywhere that Mary went,', 'The lamb was sure to go.', 'It followed her to school one day,', 'Which was against the rules.', 'It made the children laugh and play,', 'To see a lamb at school.'] crunchingTime = 5.01 msg = "Doing stuff for %s sec: " % crunchingTime print for line in poem: sys.stdout.write(msg) spinIt(time.sleep, crunchingTime) print line print for line in poem: sys.stdout.write(msg) dotItSlow(time.sleep, crunchingTime) print line print for line in poem: sys.stdout.write(msg) dotItFast(time.sleep, crunchingTime) print line |
|||||
![]()
|
|