Source code for s3workers.worker

import logging
import threading
import queue

_logger = logging.getLogger(__name__)


[docs]class Worker(threading.Thread): '''Simple thread to continuously pull jobs off a work queue until told to stop or that no more jobs will be submitted. :param work_queue: the job queue to query for work ''' def __init__(self, work_queue): super(self.__class__, self).__init__() self._work_queue = work_queue self._current_lock = threading.Lock() self._current_job = None self._stop_requested = threading.Event() def __str__(self): statestr = 'alive' if self.is_alive() else 'dead' jobstr = ' current=' + str(self._current_job) if self._current_job else '' return '%s(%s%s)' % (self.__class__.__name__, statestr, jobstr)
[docs] def run(self): '''Run until a stop is requested or there is no more work expected.''' while True: if self._stop_requested.is_set(): break if self._work_queue.is_done(): break try: with self._current_lock: self._current_job = self._work_queue.get(timeout=0.1) except queue.Empty: continue try: _logger.debug('starting: %s', self._current_job) self._current_job.run() finally: self._work_queue.task_done() with self._current_lock: self._current_job = None
[docs] def stop(self): '''Finish any job currently in progress and then terminate.''' self._stop_requested.set() with self._current_lock: if self._current_job: self._current_job.stop()