Source code for s3workers.jobs

import threading


[docs]class Job(object): def __init__(self): self._state = 'ready' self._stop_requested = threading.Event()
[docs] def run(self, *args, **kwargs): try: if not self._stop_requested.is_set(): self._state = 'running' self._runner(*args, **kwargs) finally: self._state = 'stopped'
[docs] def stop(self): self._stop_requested.set()
def __str__(self, details=''): return '%s(%s) is %s' % (self.__class__.__name__, details, self._state) def _runner(self): pass
[docs]class S3ListJob(Job): '''Iterate through S3 objects invoking a callback for each :param bucket: the S3 bucket manager :param prefix: the key prefix to use when listing from the S3 bucket :param selector: optional callback to be evaluated for when an object is "interesting" :param key_handler: the callback to be invoked for each selected object (or all if no selector provided) :param progress: the callback to be invoked for reporting progress through the listing ''' def __init__(self, bucket, prefix, selector, key_handler, progress): super(self.__class__, self).__init__() self._bucket = bucket self._prefix = prefix self._selector = selector self._key_handler = key_handler self._progress = progress def __str__(self): return super(self.__class__, self).__str__(self._bucket.name + '/' + self._prefix) def _runner(self): for key in self._bucket.list(prefix=self._prefix): if self._stop_requested.is_set(): break self._progress() if not key.md5: key.md5 = key.etag[1:-1] # GROSS. HACK. Likely break if multipart-uploaded... if self._is_selected(key): self._key_handler(key) def _is_selected(self, key): if not self._selector: return True size = key.size # noqa: 841 name = key.name # noqa: 841 md5 = key.md5 # noqa: 841 return eval(self._selector)