diff --git a/async/__init__.py b/async/__init__.py index 0432610..1ddca0e 100644 --- a/async/__init__.py +++ b/async/__init__.py @@ -11,13 +11,13 @@ def _init_atexit(): import atexit from . import thread atexit.register(thread.do_terminate_threads) - + def _init_signals(): """Assure we shutdown our threads correctly when being interrupted""" import signal from . import thread import sys - + prev_handler = signal.getsignal(signal.SIGINT) def thread_interrupt_handler(signum, frame): thread.do_terminate_threads() @@ -30,7 +30,7 @@ def thread_interrupt_handler(signum, frame): signal.signal(signal.SIGINT, thread_interrupt_handler) except ValueError: # happens if we don't try it from the main thread - print("Failed to setup thread-interrupt handler. This is usually not critical", file=sys.stderr) + sys.stderr.write("Failed to setup thread-interrupt handler. This is usually not critical") # END exception handling diff --git a/async/channel.py b/async/channel.py index 84a142e..c21ac8e 100644 --- a/async/channel.py +++ b/async/channel.py @@ -3,13 +3,19 @@ # This module is part of async and is released under # the New BSD License: http://www.opensource.org/licenses/bsd-license.php """Contains a queue based channel implementation""" -from queue import ( - Empty, - Full +try: + from queue import ( + Empty, + Full + ) +except ImportError: + from Queue import ( + Empty, + Full ) from .util import ( - AsyncQueue, + AsyncQueue, SyncQueue, ReadOnly ) @@ -19,27 +25,27 @@ import sys __all__ = ('Channel', 'SerialChannel', 'Writer', 'ChannelWriter', 'CallbackChannelWriter', - 'Reader', 'ChannelReader', 'CallbackChannelReader', 'mkchannel', 'ReadOnly', + 'Reader', 'ChannelReader', 'CallbackChannelReader', 'mkchannel', 'ReadOnly', 'IteratorReader', 'CallbackReaderMixin', 'CallbackWriterMixin') -#{ Classes +#{ Classes class Channel(object): """A channel is similar to a file like object. It has a write end as well as one or more read ends. If Data is in the channel, it can be read, if not the read operation will block until data becomes available. If the channel is closed, any read operation will result in an exception - + This base class is not instantiated directly, but instead serves as constructor for Rwriter pairs. - + Create a new channel """ __slots__ = 'queue' - + # The queue to use to store the actual data QueueCls = AsyncQueue - + def __init__(self): - """initialize this instance with a queue holding the channel contents""" + """initialize this instance with a queue holding the channel contents""" self.queue = self.QueueCls() @@ -51,106 +57,106 @@ class SerialChannel(Channel): class Writer(object): """A writer is an object providing write access to a possibly blocking reading device""" __slots__ = tuple() - + #{ Interface - + def __init__(self, device): """Initialize the instance with the device to write to""" - + def write(self, item, block=True, timeout=None): """Write the given item into the device :param block: True if the device may block until space for the item is available - :param timeout: The time in seconds to wait for the device to become ready + :param timeout: The time in seconds to wait for the device to become ready in blocking mode""" raise NotImplementedError() - + def size(self): """:return: number of items already in the device, they could be read with a reader""" raise NotImplementedError() - + def close(self): - """Close the channel. Multiple close calls on a closed channel are no + """Close the channel. Multiple close calls on a closed channel are no an error""" raise NotImplementedError() - + def closed(self): """:return: True if the channel was closed""" raise NotImplementedError() - + #} END interface - + class ChannelWriter(Writer): """The write end of a channel, a file-like interface for a channel""" __slots__ = ('channel', '_put') - + def __init__(self, channel): """Initialize the writer to use the given channel""" self.channel = channel self._put = self.channel.queue.put - + #{ Interface def write(self, item, block=False, timeout=None): return self._put(item, block, timeout) - + def size(self): return self.channel.queue.qsize() - + def close(self): - """Close the channel. Multiple close calls on a closed channel are no + """Close the channel. Multiple close calls on a closed channel are no an error""" self.channel.queue.set_writable(False) - + def closed(self): """:return: True if the channel was closed""" return not self.channel.queue.writable() - #} END interface - + #} END interface + class CallbackWriterMixin(object): - """The write end of a channel which allows you to setup a callback to be + """The write end of a channel which allows you to setup a callback to be called after an item was written to the channel""" # slots don't work with mixin's :( # __slots__ = ('_pre_cb') - + def __init__(self, *args): super(CallbackWriterMixin, self).__init__(*args) self._pre_cb = None - + def set_pre_cb(self, fun = lambda item: item): """ Install a callback to be called before the given item is written. It returns a possibly altered item which will be written to the channel instead, making it useful for pre-write item conversions. Providing None uninstalls the current method. - + :return: the previously installed function or None :note: Must be thread-safe if the channel is used in multiple threads""" prev = self._pre_cb self._pre_cb = fun return prev - + def write(self, item, block=True, timeout=None): if self._pre_cb: item = self._pre_cb(item) super(CallbackWriterMixin, self).write(item, block, timeout) - - + + class CallbackChannelWriter(CallbackWriterMixin, ChannelWriter): """Implements a channel writer with callback functionality""" pass - + class Reader(object): """Allows reading from a device""" __slots__ = tuple() - + #{ Interface def __init__(self, device): """Initialize the instance with the device to read from""" - + #{ Iterator protocol - + def __iter__(self): return self @@ -160,52 +166,56 @@ def __next__(self): if items: return items[0] raise StopIteration - + + def next(self): + """Support the Python 2 iterator syntax""" + return self.__next__() + #} END iterator protocol - - + + #{ Interface - + def read(self, count=0, block=True, timeout=None): """ read a list of items read from the device. The list, as a sequence - of items, is similar to the string of characters returned when reading from + of items, is similar to the string of characters returned when reading from file like objects. - + :param count: given amount of items to read. If < 1, all items will be read :param block: if True, the call will block until an item is available - :param timeout: if positive and block is True, it will block only for the + :param timeout: if positive and block is True, it will block only for the given amount of seconds, returning the items it received so far. The timeout is applied to each read item, not for the whole operation. - :return: single item in a list if count is 1, or a list of count items. + :return: single item in a list if count is 1, or a list of count items. If the device was empty and count was 1, an empty list will be returned. - If count was greater 1, a list with less than count items will be + If count was greater 1, a list with less than count items will be returned. - If count was < 1, a list with all items that could be read will be + If count was < 1, a list with all items that could be read will be returned.""" raise NotImplementedError() - + #} END interface - + class ChannelReader(Reader): """Allows reading from a channel. The reader is thread-safe if the channel is as well""" __slots__ = 'channel' - + def __init__(self, channel): """Initialize this instance from its parent write channel""" self.channel = channel - + #{ Interface - + def read(self, count=0, block=True, timeout=None): # if the channel is closed for writing, we never block # NOTE: is handled by the queue - # We don't check for a closed state here has it costs time - most of + # We don't check for a closed state here has it costs time - most of # the time, it will not be closed, and will bail out automatically once # it gets closed - - + + # in non-blocking mode, its all not a problem out = list() queue = self.channel.queue @@ -232,18 +242,18 @@ def read(self, count=0, block=True, timeout=None): if count == 0: count = sys.maxsize # END handle count - + i = 0 while i < count: try: out.append(queue.get(block, timeout)) i += 1 except Empty: - # here we are only if + # here we are only if # someone woke us up to inform us about the queue that changed # its writable state # The following branch checks for closed channels, and pulls - # as many items as we need and as possible, before + # as many items as we need and as possible, before # leaving the loop. if not queue.writable(): try: @@ -252,71 +262,71 @@ def read(self, count=0, block=True, timeout=None): i += 1 # END count loop except Empty: - break # out of count loop + break # out of count loop # END handle absolutely empty queue - # END handle closed channel - + # END handle closed channel + # if we are here, we woke up and the channel is not closed # Either the queue became writable again, which currently shouldn't # be able to happen in the channel, or someone read with a timeout # that actually timed out. - # As it timed out, which is the only reason we are here, + # As it timed out, which is the only reason we are here, # we have to abort break # END ignore empty - + # END for each item # END handle blocking return out - - #} END interface + + #} END interface class CallbackReaderMixin(object): """A channel which sends a callback before items are read from the channel""" # unfortunately, slots can only use direct inheritance, have to turn it off :( # __slots__ = "_pre_cb" - + def __init__(self, *args): super(CallbackReaderMixin, self).__init__(*args) self._pre_cb = None self._post_cb = None - + def set_pre_cb(self, fun = lambda count: None): """ - Install a callback to call with the item count to be read before any - item is actually read from the channel. + Install a callback to call with the item count to be read before any + item is actually read from the channel. Exceptions will be propagated. If a function is not provided, the call is effectively uninstalled. - + :return: the previously installed callback or None :note: The callback must be threadsafe if the channel is used by multiple threads.""" prev = self._pre_cb self._pre_cb = fun return prev - + def set_post_cb(self, fun = lambda items: items): """ - Install a callback to call after items have been read, but before + Install a callback to call after items have been read, but before they are returned to the caller. The callback may adjust the items and/or the list. If no function is provided, the callback is uninstalled - + :return: the previously installed function""" prev = self._post_cb self._post_cb = fun return prev - + def read(self, count=0, block=True, timeout=None): if self._pre_cb: self._pre_cb(count) items = super(CallbackReaderMixin, self).read(count, block, timeout) - + if self._post_cb: items = self._post_cb(items) return items - - - + + + class CallbackChannelReader(CallbackReaderMixin, ChannelReader): """Implements a channel reader with callback functionality""" pass @@ -326,25 +336,25 @@ class IteratorReader(Reader): """A Reader allowing to read items from an iterator, instead of a channel. Reads will never block. Its thread-safe""" __slots__ = ("_empty", '_iter', '_lock') - + # the type of the lock to use when reading from the iterator lock_type = threading.Lock - + def __init__(self, iterator): self._empty = False - if not hasattr(iterator, 'next'): + if not hasattr(iterator, 'next') and not (hasattr(iterator, "__next__")): raise ValueError("Iterator %r needs a next() function" % iterator) self._iter = iterator self._lock = self.lock_type() - + def read(self, count=0, block=True, timeout=None): """Non-Blocking implementation of read""" - # not threadsafe, but worst thing that could happen is that + # not threadsafe, but worst thing that could happen is that # we try to get items one more time if self._empty: return list() # END early abort - + self._lock.acquire() try: if count == 0: @@ -366,7 +376,7 @@ def read(self, count=0, block=True, timeout=None): finally: self._lock.release() # END handle locking - + #} END classes diff --git a/async/pool.py b/async/pool.py index 2bacf2f..bf37739 100644 --- a/async/pool.py +++ b/async/pool.py @@ -4,7 +4,7 @@ # the New BSD License: http://www.opensource.org/licenses/bsd-license.php """Implementation of a thread-pool working with channels""" from .thread import ( - WorkerThread, + WorkerThread, StopProcessing, ) from threading import Lock @@ -14,15 +14,21 @@ DummyLock ) -from queue import ( - Queue, - Empty +try: + from queue import ( + Queue, + Empty + ) +except ImportError: + from Queue import ( + Queue, + Empty ) -from .graph import Graph +from .graph import Graph from .channel import ( mkchannel, - ChannelWriter, + ChannelWriter, Channel, SerialChannel, CallbackChannelReader @@ -41,32 +47,32 @@ class PoolReader(CallbackChannelReader): """A reader designed to read from channels which take part in pools It acts like a handle to the underlying task in the pool.""" __slots__ = ('_task_ref', '_pool_ref') - + def __init__(self, channel, task, pool): CallbackChannelReader.__init__(self, channel) self._task_ref = weakref.ref(task) self._pool_ref = weakref.ref(pool) - + def __del__(self): """Assures that our task will be deleted if we were the last reader""" task = self._task_ref() if task is None: return - + pool = self._pool_ref() if pool is None: return - - # if this is the last reader to the wc we just handled, there - # is no way anyone will ever read from the task again. If so, + + # if this is the last reader to the wc we just handled, there + # is no way anyone will ever read from the task again. If so, # delete the task in question, it will take care of itself and orphans # it might leave - # 1 is ourselves, + 1 for the call + 1, and 3 magical ones which + # 1 is ourselves, + 1 for the call + 1, and 3 magical ones which # I can't explain, but appears to be normal in the destructor # On the caller side, getrefcount returns 2, as expected - # When just calling remove_task, + # When just calling remove_task, # it has no way of knowing that the write channel is about to diminsh. - # which is why we pass the info as a private kwarg - not nice, but + # which is why we pass the info as a private kwarg - not nice, but # okay for now if sys.getrefcount(self) < 6: pool.remove_task(task, _from_destructor_ = True) @@ -75,20 +81,20 @@ def __del__(self): #{ Internal def _read(self, count=0, block=True, timeout=None): return CallbackChannelReader.read(self, count, block, timeout) - - + + def pool_ref(self): """:return: reference to the pool we belong to""" return self._pool_ref - + def task_ref(self): """:return: reference to the task producing our items""" return self._task_ref - + #} END internal #{ Interface - + def task(self): """:return: task we read from :raise ValueError: If the instance is not attached to at task""" @@ -96,7 +102,7 @@ def task(self): if task is None: raise ValueError("PoolReader is not associated with at task anymore") return task - + def pool(self): """:return: pool our task belongs to :raise ValueError: if the instance does not belong to a pool""" @@ -104,10 +110,10 @@ def pool(self): if pool is None: raise ValueError("PoolReader is not associated with a pool anymore") return pool - - - #} END interface - + + + #} END interface + def read(self, count=0, block=True, timeout=None): """Read an item that was processed by one of our threads :note: Triggers task dependency handling needed to provide the necessary input""" @@ -116,53 +122,53 @@ def read(self, count=0, block=True, timeout=None): # is in herently unsafe depending on the design of the task network # If we put on tasks onto the queue for every request, we are sure # to always produce enough items, even if the task.min_count actually - # provided enough - its better to have some possibly empty task runs + # provided enough - its better to have some possibly empty task runs # than having and empty queue that blocks. - - # if the user tries to use us to read from a done task, we will never + + # if the user tries to use us to read from a done task, we will never # compute as all produced items are already in the channel task = self._task_ref() if task is None: return list() # END abort if task was deleted - + skip_compute = task.is_done() or task.error() - + ########## prepare ############################## if not skip_compute: self._pool_ref()._prepare_channel_read(task, count) # END prepare pool scheduling - - + + ####### read data ######## ########################## # read actual items, tasks were setup to put their output into our channel ( as well ) items = CallbackChannelReader.read(self, count, block, timeout) ########################## - - + + return items - - - + + + class Pool(object): - """A thread pool maintains a set of one or more worker threads, but supports + """A thread pool maintains a set of one or more worker threads, but supports a fully serial mode in which case the amount of threads is zero. - + Work is distributed via Channels, which form a dependency graph. The evaluation is lazy, as work will only be done once an output is requested. - - The thread pools inherent issue is the global interpreter lock that it will hit, + + The thread pools inherent issue is the global interpreter lock that it will hit, which gets worse considering a few c extensions specifically lock their part globally as well. The only way this will improve is if custom c extensions are written which do some bulk work, but release the GIL once they have acquired their resources. - - Due to the nature of having multiple objects in git, its easy to distribute + + Due to the nature of having multiple objects in git, its easy to distribute that work cleanly among threads. - - :note: the current implementation returns channels which are meant to be - used only from the main thread, hence you cannot consume their results + + :note: the current implementation returns channels which are meant to be + used only from the main thread, hence you cannot consume their results from multiple threads unless you use a task for it.""" __slots__ = ( '_tasks', # a graph of tasks '_num_workers', # list of workers @@ -170,21 +176,21 @@ class Pool(object): '_taskorder_cache', # map task id -> ordered dependent tasks '_taskgraph_lock', # lock for accessing the task graph ) - + # CONFIGURATION - # The type of worker to create - its expected to provide the Thread interface, + # The type of worker to create - its expected to provide the Thread interface, # taking the taskqueue as only init argument # as well as a method called stop_and_join() to terminate it WorkerCls = None - - # The type of lock to use to protect critical sections, providing the + + # The type of lock to use to protect critical sections, providing the # threading.Lock interface LockCls = None - + # the type of the task queue to use - it must provide the Queue interface TaskQueueCls = None - - + + def __init__(self, size=0): self._tasks = Graph() self._num_workers = 0 @@ -192,25 +198,25 @@ def __init__(self, size=0): self._taskgraph_lock = self.LockCls() self._taskorder_cache = dict() self.set_size(size) - + def __del__(self): self.set_size(0) - + #{ Internal - + def _prepare_channel_read(self, task, count): - """Process the tasks which depend on the given one to be sure the input + """Process the tasks which depend on the given one to be sure the input channels are filled with data once we process the actual task - - Tasks have two important states: either they are done, or they are done + + Tasks have two important states: either they are done, or they are done and have an error, so they are likely not to have finished all their work. - - Either way, we will put them onto a list of tasks to delete them, providng + + Either way, we will put them onto a list of tasks to delete them, providng information about the failed ones. - - Tasks which are not done will be put onto the queue for processing, which + + Tasks which are not done will be put onto the queue for processing, which is fine as we walked them depth-first.""" - # for the walk, we must make sure the ordering does not change. Even + # for the walk, we must make sure the ordering does not change. Even # when accessing the cache, as it is related to graph changes self._taskgraph_lock.acquire() try: @@ -224,16 +230,16 @@ def _prepare_channel_read(self, task, count): finally: self._taskgraph_lock.release() # END handle locking - - # check the min count on all involved tasks, and be sure that we don't + + # check the min count on all involved tasks, and be sure that we don't # have any task which produces less than the maximum min-count of all tasks - # The actual_count is used when chunking tasks up for the queue, whereas + # The actual_count is used when chunking tasks up for the queue, whereas # the count is usued to determine whether we still have enough output # on the queue, checking qsize ( ->revise ) # ABTRACT: If T depends on T-1, and the client wants 1 item, T produces - # at least 10, T-1 goes with 1, then T will block after 1 item, which - # is read by the client. On the next read of 1 item, we would find T's - # queue empty and put in another 10, which could put another thread into + # at least 10, T-1 goes with 1, then T will block after 1 item, which + # is read by the client. On the next read of 1 item, we would find T's + # queue empty and put in another 10, which could put another thread into # blocking state. T-1 produces one more item, which is consumed right away # by the two threads running T. Although this works in the end, it leaves # many threads blocking and waiting for input, which is not desired. @@ -247,28 +253,28 @@ def _prepare_channel_read(self, task, count): if 0 < count < min_count: actual_count = min_count # END set actual count - - # the list includes our tasks - the first one to evaluate first, the + + # the list includes our tasks - the first one to evaluate first, the # requested one last - for task in dfirst_tasks: + for task in dfirst_tasks: # if task.error() or task.is_done(): # in theory, the should never be consumed task in the pool, right ? - # They delete themselves once they are done. But as we run asynchronously, - # It can be that someone reads, while a task realizes its done, and + # They delete themselves once they are done. But as we run asynchronously, + # It can be that someone reads, while a task realizes its done, and # we get here to prepare the read although it already is done. # Its not a problem though, the task wiill not do anything. # Hence we don't waste our time with checking for it # raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") # END skip processing - - # but use the actual count to produce the output, we may produce + + # but use the actual count to produce the output, we may produce # more than requested numchunks = 1 chunksize = actual_count remainder = 0 - + # we need the count set for this - can't chunk up unlimited items - # In serial mode we could do this by checking for empty input channels, + # In serial mode we could do this by checking for empty input channels, # but in dispatch mode its impossible ( == not easily possible ) # Only try it if we have enough demand if task.max_chunksize and actual_count > task.max_chunksize: @@ -276,12 +282,16 @@ def _prepare_channel_read(self, task, count): chunksize = task.max_chunksize remainder = actual_count - (numchunks * chunksize) # END handle chunking - + + # These both need to be integers, not floats + chunksize = int(chunksize) + numchunks = int(numchunks) + # the following loops are kind of unrolled - code duplication - # should make things execute faster. Putting the if statements + # should make things execute faster. Putting the if statements # into the loop would be less code, but ... slower if self._num_workers: - # respect the chunk size, and split the task up if we want + # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task qput = self._queue.put if numchunks > 1: @@ -291,7 +301,7 @@ def _prepare_channel_read(self, task, count): else: qput((task.process, chunksize)) # END try efficient looping - + if remainder: qput((task.process, remainder)) # END handle chunksize @@ -304,46 +314,46 @@ def _prepare_channel_read(self, task, count): else: task.process(chunksize) # END try efficient looping - + if remainder: task.process(remainder) # END handle chunksize # END handle serial mode # END for each task to process - - + + def _remove_task_if_orphaned(self, task, from_destructor): """Check the task, and delete it if it is orphaned""" # 1 for writer on task, 1 for the getrefcount call + 1 for each other writer/reader - # If we are getting here from the destructor of an RPool channel, - # its totally valid to virtually decrement the refcount by 1 as + # If we are getting here from the destructor of an RPool channel, + # its totally valid to virtually decrement the refcount by 1 as # we can expect it to drop once the destructor completes, which is when # we finish all recursive calls max_ref_count = 3 + from_destructor if sys.getrefcount(task.writer().channel) < max_ref_count: self.remove_task(task, from_destructor) #} END internal - - #{ Interface + + #{ Interface def size(self): """:return: amount of workers in the pool :note: method is not threadsafe !""" return self._num_workers - + def set_size(self, size=0): - """Set the amount of workers to use in this pool. When reducing the size, + """Set the amount of workers to use in this pool. When reducing the size, threads will continue with their work until they are done before effectively being removed. - + :return: self - :param size: if 0, the pool will do all work itself in the calling thread, + :param size: if 0, the pool will do all work itself in the calling thread, otherwise the work will be distributed among the given amount of threads. - If the size is 0, newly added tasks will use channels which are NOT + If the size is 0, newly added tasks will use channels which are NOT threadsafe to optimize item throughput. - + :note: currently NOT threadsafe !""" assert size > -1, "Size cannot be negative" - + # either start new threads, or kill existing ones. # If we end up with no threads, we process the remaining chunks on the queue # ourselves @@ -359,7 +369,7 @@ def set_size(self, size=0): self._num_workers += add_count elif cur_count > size: # We don't care which thread exactly gets hit by our stop request - # On their way, they will consume remaining tasks, but new ones + # On their way, they will consume remaining tasks, but new ones # could be added as we speak. del_count = cur_count - size for i in range(del_count): @@ -367,22 +377,22 @@ def set_size(self, size=0): # END for each thread to stop self._num_workers -= del_count # END handle count - + if size == 0: - # NOTE: we do not preocess any tasks still on the queue, as we ill + # NOTE: we do not preocess any tasks still on the queue, as we ill # naturally do that once we read the next time, only on the tasks - # that are actually required. The queue will keep the tasks, + # that are actually required. The queue will keep the tasks, # and once we are deleted, they will vanish without additional # time spend on them. If there shouldn't be any consumers anyway. - # If we should reenable some workers again, they will continue on the + # If we should reenable some workers again, they will continue on the # remaining tasks, probably with nothing to do. - # We can't clear the task queue if we have removed workers - # as they will receive the termination signal through it, and if + # We can't clear the task queue if we have removed workers + # as they will receive the termination signal through it, and if # we had added workers, we wouldn't be here ;). - pass + pass # END process queue return self - + def num_tasks(self): """:return: amount of tasks""" self._taskgraph_lock.acquire() @@ -390,35 +400,35 @@ def num_tasks(self): return len(self._tasks.nodes) finally: self._taskgraph_lock.release() - + def remove_task(self, task, _from_destructor_ = False): """ Delete the task. - Additionally we will remove orphaned tasks, which can be identified if their - output channel is only held by themselves, so no one will ever consume + Additionally we will remove orphaned tasks, which can be identified if their + output channel is only held by themselves, so no one will ever consume its items. - - This method blocks until all tasks to be removed have been processed, if + + This method blocks until all tasks to be removed have been processed, if they are currently being processed. - + :return: self""" self._taskgraph_lock.acquire() try: - # it can be that the task is already deleted, but its chunk was on the + # it can be that the task is already deleted, but its chunk was on the # queue until now, so its marked consumed again if not task in self._tasks.nodes: return self # END early abort - - # the task we are currently deleting could also be processed by + + # the task we are currently deleting could also be processed by # a thread right now. We don't care about it as its taking care about # its write channel itself, and sends everything it can to it. # For it it doesn't matter that its not part of our task graph anymore. - - # now delete our actual node - be sure its done to prevent further + + # now delete our actual node - be sure its done to prevent further # processing in case there are still client reads on their way. task.set_done() - + # keep its input nodes as we check whether they were orphaned in_tasks = task.in_nodes self._tasks.remove_node(task) @@ -426,22 +436,22 @@ def remove_task(self, task, _from_destructor_ = False): finally: self._taskgraph_lock.release() # END locked deletion - + for t in in_tasks: self._remove_task_if_orphaned(t, _from_destructor_) # END handle orphans recursively - + return self - + def add_task(self, task): """Add a new task to be processed. - - :return: a read channel to retrieve processed items. If that handle is lost, - the task will be considered orphaned and will be deleted on the next + + :return: a read channel to retrieve processed items. If that handle is lost, + the task will be considered orphaned and will be deleted on the next occasion.""" # create a write channel for it ctype = Channel - + # adjust the task with our pool ref, if it has the slot and is empty # For now, we don't allow tasks to be used in multiple pools, except # for by their channels @@ -453,18 +463,18 @@ def add_task(self, task): raise ValueError("Task %r is already registered to another pool" % task.id) # END handle pool exclusivity # END handle pool aware tasks - + self._taskgraph_lock.acquire() try: self._taskorder_cache.clear() self._tasks.add_node(task) - + # Use a non-threadsafe queue # This brings about 15% more performance, but sacrifices thread-safety if self.size() == 0: ctype = SerialChannel # END improve locks - + # setup the tasks channel - respect the task creators choice though # if it is set. wc = task.writer() @@ -480,7 +490,7 @@ def add_task(self, task): finally: self._taskgraph_lock.release() # END sync task addition - + # If the input channel is one of our read channels, we add the relation if hasattr(task, 'reader'): ic = task.reader() @@ -488,8 +498,8 @@ def add_task(self, task): self._taskgraph_lock.acquire() try: self._tasks.add_edge(ic._task_ref(), task) - - # additionally, bypass ourselves when reading from the + + # additionally, bypass ourselves when reading from the # task, if possible if hasattr(ic, '_read'): task.set_read(ic._read) @@ -499,12 +509,12 @@ def add_task(self, task): # END handle edge-adding # END add task relation # END handle input channels for connections - + return rc - - #} END interface - - + + #} END interface + + class ThreadPool(Pool): """A pool using threads as worker""" WorkerCls = WorkerThread diff --git a/async/test/task.py b/async/test/task.py index 25c00d1..4c811ed 100644 --- a/async/test/task.py +++ b/async/test/task.py @@ -17,7 +17,7 @@ def __init__(self, *args, **kwargs): self.plock = threading.Lock() self.item_count = 0 self.process_count = 0 - + def do_fun(self, item): self.lock.acquire() self.item_count += 1 @@ -25,7 +25,7 @@ def do_fun(self, item): if self.should_fail: raise AssertionError("I am failing just for the fun of it") return item - + def process(self, count=1): # must do it first, otherwise we might read and check results before # the thread gets here :). Its a lesson ! @@ -33,7 +33,7 @@ def process(self, count=1): self.process_count += 1 self.plock.release() super(_TestTaskBase, self).process(count) - + def _assert(self, pc, fc, check_scheduled=False): """Assert for num process counts (pc) and num function counts (fc) :return: self""" @@ -42,12 +42,12 @@ def _assert(self, pc, fc, check_scheduled=False): print(self.item_count, fc) assert self.item_count == fc self.lock.release() - - # NOTE: asserting num-writers fails every now and then, implying a thread is - # still processing (an empty chunk) when we are checking it. This can + + # NOTE: asserting num-writers fails every now and then, implying a thread is + # still processing (an empty chunk) when we are checking it. This can # only be prevented by checking the scheduled items, which requires locking - # and causes slowdows, so we don't do that. If the num_writers - # counter wouldn't be maintained properly, more tests would fail, so + # and causes slowdows, so we don't do that. If the num_writers + # counter wouldn't be maintained properly, more tests would fail, so # we can safely refrain from checking this here # self._wlock.acquire() # assert self._num_writers == 0 @@ -57,17 +57,17 @@ def _assert(self, pc, fc, check_scheduled=False): class TestThreadTask(_TestTaskBase, IteratorThreadTask): pass - + class TestFailureThreadTask(TestThreadTask): """Fails after X items""" def __init__(self, *args, **kwargs): self.fail_after = kwargs.pop('fail_after') super(TestFailureThreadTask, self).__init__(*args, **kwargs) - + def do_fun(self, item): item = TestThreadTask.do_fun(self, item) - + self.lock.acquire() try: if self.item_count > self.fail_after: @@ -76,18 +76,18 @@ def do_fun(self, item): self.lock.release() # END handle fail after return item - + class TestChannelThreadTask(_TestTaskBase, ChannelThreadTask): """Apply a transformation on items read from an input channel""" def __init__(self, *args, **kwargs): self.fail_after = kwargs.pop('fail_after', 0) super(TestChannelThreadTask, self).__init__(*args, **kwargs) - + def do_fun(self, item): """return tuple(i, i*2)""" item = super(TestChannelThreadTask, self).do_fun(item) - + # fail after support if self.fail_after: self.lock.acquire() @@ -97,7 +97,7 @@ def do_fun(self, item): finally: self.lock.release() # END handle fail-after - + if isinstance(item, tuple): i = item[0] return item + (i * self.id, ) @@ -109,28 +109,28 @@ def do_fun(self, item): class TestPerformanceThreadTask(ChannelThreadTask): """Applies no operation to the item, and does not lock, measuring the actual throughput of the system""" - + def do_fun(self, item): return item class TestVerifyChannelThreadTask(_TestTaskBase, ChannelThreadTask): - """An input channel task, which verifies the result of its input channels, + """An input channel task, which verifies the result of its input channels, should be last in the chain. Id must be int""" - + def do_fun(self, item): """return tuple(i, i*2)""" item = super(TestVerifyChannelThreadTask, self).do_fun(item) - + # make sure the computation order matches assert isinstance(item, tuple), "input was no tuple: %s" % item - + base = item[0] for id, num in enumerate(item[1:]): assert num == base * id, "%i != %i, orig = %s" % (num, base * id, str(item)) # END verify order - + return item #{ Utilities @@ -140,60 +140,60 @@ def make_proxy_method(t): wt = weakref.proxy(t) return lambda item: wt.do_fun(item) -def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0, - feedercls=TestThreadTask, transformercls=TestChannelThreadTask, +def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0, + feedercls=TestThreadTask, transformercls=TestChannelThreadTask, include_verifier=True): - """Create a task chain of feeder, count transformers and order verifcator + """Create a task chain of feeder, count transformers and order verifcator to the pool p, like t1 -> t2 -> t3 - :param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would + :param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would make the third transformer fail after 20 items - :param feeder_channel: if set to a channel, it will be used as input of the - first transformation task. The respective first task in the return value + :param feeder_channel: if set to a channel, it will be used as input of the + first transformation task. The respective first task in the return value will be None. :param id_offset: defines the id of the first transformation task, all subsequent ones will add one :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))""" nt = p.num_tasks() - + feeder = None frc = feeder_channel if feeder_channel is None: feeder = make_iterator_task(ni, taskcls=feedercls) frc = p.add_task(feeder) # END handle specific feeder - + rcs = [frc] tasks = [feeder] - + inrc = frc for tc in range(count): t = transformercls(inrc, tc+id_offset, None) - + t.fun = make_proxy_method(t) #t.fun = t.do_fun inrc = p.add_task(t) - + tasks.append(t) rcs.append(inrc) # END create count transformers - + # setup failure for id, fail_after in fail_setup: tasks[1+id].fail_after = fail_after - # END setup failure - + # END setup failure + if include_verifier: verifier = TestVerifyChannelThreadTask(inrc, 'verifier', None) #verifier.fun = verifier.do_fun verifier.fun = make_proxy_method(verifier) vrc = p.add_task(verifier) - - + + tasks.append(verifier) rcs.append(vrc) # END handle include verifier return tasks, rcs - + def make_iterator_task(ni, taskcls=TestThreadTask, **kwargs): """:return: task which yields ni items :param taskcls: the actual iterator type to use diff --git a/async/test/test_channel.py b/async/test/test_channel.py index 44ae4e3..6344323 100644 --- a/async/test/test_channel.py +++ b/async/test/test_channel.py @@ -3,55 +3,58 @@ # This module is part of async and is released under # the New BSD License: http://www.opensource.org/licenses/bsd-license.php """Channel testing""" -from .lib import * -from async.channel import * +from .lib import TestBase +from async.channel import ( + CallbackChannelWriter, CallbackChannelReader, ChannelWriter, ChannelReader, + IteratorReader, mkchannel, ReadOnly +) import time class TestChannels(TestBase): - + def test_base(self): # creating channel yields a write and a read channal wc, rc = mkchannel() assert isinstance(wc, ChannelWriter) # default args assert isinstance(rc, ChannelReader) - - + + # TEST UNLIMITED SIZE CHANNEL - writing+reading is FIFO item = 1 item2 = 2 wc.write(item) wc.write(item2) - + # read all - it blocks as its still open for writing to = 0.2 st = time.time() assert rc.read(timeout=to) == [item, item2] assert time.time() - st >= to - + # next read blocks. it waits a second st = time.time() assert len(rc.read(1, True, to)) == 0 assert time.time() - st >= to - + # writing to a closed channel raises assert not wc.closed() wc.close() assert wc.closed() wc.close() # fine assert wc.closed() - + self.failUnlessRaises(ReadOnly, wc.write, 1) - + # reading from a closed channel never blocks assert len(rc.read()) == 0 assert len(rc.read(5)) == 0 assert len(rc.read(1)) == 0 - - + + # test callback channels wc, rc = mkchannel(wtype = CallbackChannelWriter, rtype = CallbackChannelReader) - + cb = [0, 0, 0] # set slots to one if called def pre_write(item): cb[0] = 1 @@ -62,8 +65,8 @@ def post_read(items): assert isinstance(items, list) cb[2] = 1 return [ i+1 for i in items] - - + + # set, verify it returns previous one assert wc.set_pre_cb(pre_write) is None assert rc.set_pre_cb(pre_read) is None @@ -71,17 +74,17 @@ def post_read(items): assert wc.set_pre_cb(pre_write) is pre_write assert rc.set_pre_cb(pre_read) is pre_read assert rc.set_post_cb(post_read) is post_read - + # writer transforms input val = 5 wc.write(val) assert cb[0] == 1 and cb[1] == 0 - + rval = rc.read(1)[0] # read one item, must not block assert cb[0] == 1 and cb[1] == 1 and cb[2] == 1 assert rval == val + 1 + 1 - - + + # ITERATOR READER reader = IteratorReader(iter(list(range(10)))) assert len(reader.read(2)) == 2 @@ -89,14 +92,14 @@ def post_read(items): # its empty now assert len(reader.read(0)) == 0 assert len(reader.read(5)) == 0 - + # doesn't work if item is not an iterator self.failUnlessRaises(ValueError, IteratorReader, list()) - - + + # test general read-iteration - its supported by all readers reader = IteratorReader(iter(list(range(10)))) + assert len(list(reader)) == 10 - - # NOTE: its thread-safety is tested by the pool - + + # NOTE: its thread-safety is tested by the pool diff --git a/async/test/test_performance.py b/async/test/test_performance.py index fb35ff1..25435b4 100644 --- a/async/test/test_performance.py +++ b/async/test/test_performance.py @@ -16,9 +16,9 @@ class TestThreadPoolPerformance(TestBase): - + max_threads = cpu_count() - + def test_base(self): # create a dependency network, and see how the performance changes # when adjusting the amount of threads @@ -29,16 +29,16 @@ def test_base(self): pool.set_size(num_threads) for num_transformers in (1, 5, 10): for read_mode in range(2): - ts, rcs = add_task_chain(pool, ni, count=num_transformers, - feedercls=IteratorThreadTask, - transformercls=TestPerformanceThreadTask, + ts, rcs = add_task_chain(pool, ni, count=num_transformers, + feedercls=IteratorThreadTask, + transformercls=TestPerformanceThreadTask, include_verifier=False) - + mode_info = "read(0)" if read_mode == 1: mode_info = "read(1) * %i" % ni # END mode info - fmt = "Threadcount=%%i: Produced %%i items using %s in %%i transformations in %%f s (%%f items / s)" % mode_info + fmt = "Threadcount=%%i: Produced %%i items using %s in %%i transformations in %%f s (%%f items / s)\n" % mode_info reader = rcs[-1] st = time.time() if read_mode == 1: @@ -49,7 +49,7 @@ def test_base(self): assert len(reader.read(0)) == ni # END handle read mode elapsed = time.time() - st - print(fmt % (num_threads, ni, num_transformers, elapsed, ni / elapsed), file=sys.stderr) + sys.stderr.write(fmt % (num_threads, ni, num_transformers, elapsed, ni / elapsed)) # END for each read-mode # END for each amount of processors # END for each thread count diff --git a/async/test/test_pool.py b/async/test/test_pool.py index a9beaf6..4c58217 100644 --- a/async/test/test_pool.py +++ b/async/test/test_pool.py @@ -18,31 +18,31 @@ class TestThreadPool(TestBase): - + max_threads = cpu_count() - + def _assert_single_task(self, p, async=False): """Performs testing in a synchronized environment""" - print("Threadpool: Starting single task (async = %i) with %i threads" % (async, p.size()), file=sys.stderr) + sys.stderr.write("Threadpool: Starting single task (async = %i) with %i threads\n" % (async, p.size())) null_tasks = p.num_tasks() # in case we had some before - + # add a simple task # it iterates n items ni = 1000 assert ni % 2 == 0, "ni needs to be dividable by 2" assert ni % 4 == 0, "ni needs to be dividable by 4" - + make_task = lambda *args, **kwargs: make_iterator_task(ni, *args, **kwargs) - + task = make_task() - + assert p.num_tasks() == null_tasks rc = p.add_task(task) assert p.num_tasks() == 1 + null_tasks assert isinstance(rc, PoolReader) assert task._out_writer is not None - - # pull the result completely - we should get one task, which calls its + + # pull the result completely - we should get one task, which calls its # function once. In sync mode, the order matches print("read(0)") items = rc.read() @@ -50,13 +50,13 @@ def _assert_single_task(self, p, async=False): task._assert(1, ni) if not async: assert items[0] == 0 and items[-1] == ni-1 - + # as the task is done, it should have been removed - we have read everything assert task.is_done() del(rc) assert p.num_tasks() == null_tasks task = make_task() - + # pull individual items rc = p.add_task(task) assert p.num_tasks() == 1 + null_tasks @@ -65,22 +65,22 @@ def _assert_single_task(self, p, async=False): for i in range(ni): items = rc.read(1) assert len(items) == 1 - + # can't assert order in async mode if not async: assert i == items[0] # END for each item elapsed = time.time() - st - print("Threadpool: processed %i individual items, with %i threads, one at a time, in %f s ( %f items / s )" % (ni, p.size(), elapsed, ni / elapsed), file=sys.stderr) - - # it couldn't yet notice that the input is depleted as we pulled exaclty - # ni items - the next one would remove it. Instead, we delete our channel + sys.stderr.write("Threadpool: processed %i individual items, with %i threads, one at a time, in %f s ( %f items / s )\n" % (ni, p.size(), elapsed, ni / elapsed)) + + # it couldn't yet notice that the input is depleted as we pulled exaclty + # ni items - the next one would remove it. Instead, we delete our channel # which triggers orphan handling assert not task.is_done() assert p.num_tasks() == 1 + null_tasks del(rc) assert p.num_tasks() == null_tasks - + # test min count # if we query 1 item, it will prepare ni / 2 task = make_task() @@ -95,14 +95,14 @@ def _assert_single_task(self, p, async=False): # rest - it has ni/2 - 2 on the queue, and pulls ni-2 # It wants too much, so the task realizes its done. The task # doesn't care about the items in its output channel - nri = ni-2 + nri = ni - 2 print("read(%i)" % nri) items = rc.read(nri) assert len(items) == nri p.remove_task(task) assert p.num_tasks() == null_tasks task._assert(2, ni) # two chunks, ni calls - + # its already done, gives us no more, its still okay to use it though # as a task doesn't have to be in the graph to allow reading its produced # items @@ -112,39 +112,39 @@ def _assert_single_task(self, p, async=False): # When we start reading, we should wake up once it sends its signal # assert task.is_closed() assert len(rc.read()) == 0 - + # test chunking # we always want 4 chunks, these could go to individual nodes task = make_task() - task.min_count = ni / 2 # restore previous value - task.max_chunksize = ni / 4 # 4 chunks + task.min_count = ni // 2 # restore previous value + task.max_chunksize = ni // 4 # 4 chunks rc = p.add_task(task) - + # must read a specific item count # count is still at ni / 2 - here we want more than that # 2 steps with n / 4 items, + 1 step with n/4 items to get + 2 - nri = ni / 2 + 2 + nri = ni // 2 + 2 print("read(%i) chunksize set" % nri) items = rc.read(nri) assert len(items) == nri # have n / 4 - 2 items on queue, want n / 4 in first chunk, cause 1 processing # ( 4 in total ). Still want n / 4 - 2 in second chunk, causing another processing - nri = ni / 2 - 2 + nri = ni // 2 - 2 print("read(%i) chunksize set" % nri) items = rc.read(nri) assert len(items) == nri - + task._assert( 5, ni) - + # delete the handle first, causing the task to be removed and to be set - # done. We check for the set-done state later. Depending on the timing, - # The task is not yet set done when we are checking it because we were + # done. We check for the set-done state later. Depending on the timing, + # The task is not yet set done when we are checking it because we were # scheduled in before the flag could be set. del(rc) assert task.is_done() assert p.num_tasks() == null_tasks # depleted - - # but this only hits if we want too many items, if we want less, it could + + # but this only hits if we want too many items, if we want less, it could # still do too much - hence we set the min_count to the same number to enforce # at least ni / 4 items to be preocessed, no matter what we request task = make_task() @@ -162,13 +162,13 @@ def _assert_single_task(self, p, async=False): # END pull individual items # too many processing counts ;) elapsed = time.time() - st - print("Threadpool: processed %i individual items in chunks of %i, with %i threads, one at a time, in %f s ( %f items / s )" % (ni, ni/4, p.size(), elapsed, ni / elapsed), file=sys.stderr) - + sys.stderr.write("Threadpool: processed %i individual items in chunks of %i, with %i threads, one at a time, in %f s ( %f items / s)\n" % (ni, ni/4, p.size(), elapsed, ni / elapsed)) + task._assert(ni, ni) assert p.num_tasks() == 1 + null_tasks assert p.remove_task(task) is p # del manually this time assert p.num_tasks() == null_tasks - + # now with we set the minimum count to reduce the number of processing counts task = make_task() task.min_count = ni / 4 @@ -184,21 +184,21 @@ def _assert_single_task(self, p, async=False): task._assert(ni / task.min_count, ni) del(rc) assert p.num_tasks() == null_tasks - + # test failure - # on failure, the processing stops and the task is finished, keeping + # on failure, the processing stops and the task is finished, keeping # his error for later task = make_task() task.should_fail = True rc = p.add_task(task) print("read(0) with failure") assert len(rc.read()) == 0 # failure on first item - + assert isinstance(task.error(), AssertionError) assert task.is_done() # on error, its marked done as well del(rc) assert p.num_tasks() == null_tasks - + # test failure after ni / 2 items # This makes sure it correctly closes the channel on failure to prevent blocking nri = ni/2 @@ -207,28 +207,28 @@ def _assert_single_task(self, p, async=False): assert len(rc.read()) == nri assert task.is_done() assert isinstance(task.error(), AssertionError) - - print("done with everything", file=sys.stderr) - - - + + sys.stderr.write("done with everything\n") + + + def _assert_async_dependent_tasks(self, pool): # includes failure in center task, 'recursive' orphan cleanup # This will also verify that the channel-close mechanism works # t1 -> t2 -> t3 - - print("Threadpool: starting async dependency test in %i threads" % pool.size(), file=sys.stderr) + + sys.stderr.write("Threadpool: starting async dependency test in %i threads\n" % pool.size()) null_tasks = pool.num_tasks() ni = 1000 count = 3 aic = count + 2 make_task = lambda *args, **kwargs: add_task_chain(pool, ni, count, *args, **kwargs) - + ts, rcs = make_task() assert len(ts) == aic assert len(rcs) == aic assert pool.num_tasks() == null_tasks + len(ts) - + # read(0) ######### st = time.time() @@ -238,14 +238,14 @@ def _assert_async_dependent_tasks(self, pool): assert len(items) == ni del(rcs) assert pool.num_tasks() == 0 # tasks depleted, all done, no handles - # wait a tiny moment - there could still be something unprocessed on the + # wait a tiny moment - there could still be something unprocessed on the # queue, increasing the refcount time.sleep(0.15) assert sys.getrefcount(ts[-1]) == 2 # ts + call assert sys.getrefcount(ts[0]) == 2 # ts + call - print("Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed), file=sys.stderr) - - + sys.stderr.write("Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )\n" % (ni, aic, elapsed, ni / elapsed)) + + # read(1) ######### ts, rcs = make_task() @@ -257,13 +257,13 @@ def _assert_async_dependent_tasks(self, pool): elapsed_single = time.time() - st # another read yields nothing, its empty assert len(rcs[-1].read()) == 0 - print("Dependent Tasks: evaluated %i items with read(1) of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed_single, ni / elapsed_single), file=sys.stderr) - - + sys.stderr.write("Dependent Tasks: evaluated %i items with read(1) of %i dependent in %f s ( %i items / s )\n" % (ni, aic, elapsed_single, ni / elapsed_single)) + + # read with min-count size ########################### # must be faster, as it will read ni / 4 chunks - # Its enough to set one task, as it will force all others in the chain + # Its enough to set one task, as it will force all others in the chain # to min_size as well. ts, rcs = make_task() assert pool.num_tasks() == len(ts) @@ -277,18 +277,18 @@ def _assert_async_dependent_tasks(self, pool): elapsed_minsize = time.time() - st # its empty assert len(rcs[-1].read()) == 0 - print("Dependent Tasks: evaluated %i items with read(1), min_size=%i, of %i dependent in %f s ( %i items / s )" % (ni, nri, aic, elapsed_minsize, ni / elapsed_minsize), file=sys.stderr) - + sys.stderr.write("Dependent Tasks: evaluated %i items with read(1), min_size=%i, of %i dependent in %f s ( %i items / s )\n" % (ni, nri, aic, elapsed_minsize, ni / elapsed_minsize)) + # it should have been a bit faster at least, and most of the time it is # Sometimes, its not, mainly because: # * The test tasks lock a lot, hence they slow down the system # * Each read will still trigger the pool to evaluate, causing some overhead - # even though there are enough items on the queue in that case. Keeping - # track of the scheduled items helped there, but it caused further inacceptable + # even though there are enough items on the queue in that case. Keeping + # track of the scheduled items helped there, but it caused further inacceptable # slowdown # assert elapsed_minsize < elapsed_single - - + + # read with failure ################### # it should recover and give at least fail_after items @@ -297,13 +297,13 @@ def _assert_async_dependent_tasks(self, pool): ts, rcs = make_task(fail_setup=[(0, fail_after)]) items = rcs[-1].read() assert len(items) == fail_after - - + + # MULTI-POOL # If two pools are connected, this shold work as well. # The second one has just one more thread ts, rcs = make_task() - + # connect verifier channel as feeder of the second pool p2 = ThreadPool(0) # don't spawn new threads, they have the tendency not to wake up on mutexes assert p2.size() == 0 @@ -312,28 +312,28 @@ def _assert_async_dependent_tasks(self, pool): assert rcs[-1].pool_ref()() is pool # it didnt change the pool assert rcs[-1] is p2ts[1].reader() assert p2.num_tasks() == len(p2ts)-1 # first is None - + # reading from the last one will evaluate all pools correctly print("read(0) multi-pool") st = time.time() items = p2rcs[-1].read() elapsed = time.time() - st assert len(items) == ni - - print("Dependent Tasks: evaluated 2 connected pools and %i items with read(0), of %i dependent tasks in %f s ( %i items / s )" % (ni, aic + aic-1, elapsed, ni / elapsed), file=sys.stderr) - - + + sys.stderr.write("Dependent Tasks: evaluated 2 connected pools and %i items with read(0), of %i dependent tasks in %f s ( %i items / s )\n" % (ni, aic + aic-1, elapsed, ni / elapsed)) + + # loose the handles of the second pool to allow others to go as well del(p2rcs); del(p2ts) assert p2.num_tasks() == 0 - + # now we lost our old handles as well, and the tasks go away ts, rcs = make_task() assert pool.num_tasks() == len(ts) - + p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count) - assert p2.num_tasks() == len(p2ts) - 1 - + assert p2.num_tasks() == len(p2ts) - 1 + # Test multi-read(1) print("read(1) * %i" % ni) reader = rcs[-1] @@ -344,14 +344,14 @@ def _assert_async_dependent_tasks(self, pool): # END for each item to get elapsed = time.time() - st del(reader) # decrement refcount - - print("Dependent Tasks: evaluated 2 connected pools and %i items with read(1), of %i dependent tasks in %f s ( %i items / s )" % (ni, aic + aic-1, elapsed, ni / elapsed), file=sys.stderr) - + + sys.stderr.write("Dependent Tasks: evaluated 2 connected pools and %i items with read(1), of %i dependent tasks in %f s ( %i items / s )\n" % (ni, aic + aic-1, elapsed, ni / elapsed)) + # another read is empty assert len(rcs[-1].read()) == 0 - + # now that both are connected, I can drop my handle to the reader - # without affecting the task-count, but whats more important: + # without affecting the task-count, but whats more important: # They remove their tasks correctly once we drop our references in the # right order del(p2ts) @@ -359,23 +359,23 @@ def _assert_async_dependent_tasks(self, pool): del(p2rcs) assert p2.num_tasks() == 0 del(p2) - + assert pool.num_tasks() == null_tasks + len(ts) - - + + del(ts) del(rcs) - - assert pool.num_tasks() == null_tasks - - + + assert pool.num_tasks() == null_tasks + + # ASSERTION: We already tested that one pool behaves correctly when an error # occours - if two pools handle their ref-counts correctly, which they - # do if we are here, then they should handle errors happening during + # do if we are here, then they should handle errors happening during # the task processing as expected as well. Hence we can safe this here - - - + + + @terminate_threads def test_base(self): max_wait_attempts = 3 @@ -385,30 +385,30 @@ def test_base(self): if len(threading.enumerate()) != 1: time.sleep(sleep_time) # END for each attempt - assert len(threading.enumerate()) == 1, "Waited %f s for threads to die, its still alive" % (max_wait_attempts, sleep_time) - + assert len(threading.enumerate()) == 1, "Waited %f s for threads to die, its still alive" % (max_wait_attempts, sleep_time) + p = ThreadPool() - + # default pools have no workers assert p.size() == 0 - + # increase and decrease the size num_threads = len(threading.enumerate()) for i in range(self.max_threads): p.set_size(i) assert p.size() == i assert len(threading.enumerate()) == num_threads + i - + for i in range(self.max_threads, -1, -1): p.set_size(i) assert p.size() == i - + assert p.size() == 0 # threads should be killed already, but we let them a tiny amount of time # just to be sure time.sleep(0.05) assert len(threading.enumerate()) == num_threads - + # SINGLE TASK SERIAL SYNC MODE ############################## # put a few unrelated tasks that we forget about - check ref counts and cleanup @@ -416,36 +416,36 @@ def test_base(self): urc1 = p.add_task(t1) urc2 = p.add_task(t2) assert p.num_tasks() == 2 - + # test pool reader assert urc1.pool_ref()() is p assert urc1.task_ref()() is t1 assert urc1.pool() == p assert urc1.task() == t1 - + ## SINGLE TASK ################# self._assert_single_task(p, False) assert p.num_tasks() == 2 del(urc1) assert p.num_tasks() == 1 - + p.remove_task(t2) assert p.num_tasks() == 0 assert sys.getrefcount(t2) == 2 - + t3 = TestChannelThreadTask(urc2, "channel", None) urc3 = p.add_task(t3) assert p.num_tasks() == 1 del(urc3) assert p.num_tasks() == 0 assert sys.getrefcount(t3) == 2 - - + + # DEPENDENT TASKS SYNC MODE ########################### self._assert_async_dependent_tasks(p) - - + + # SINGLE TASK THREADED ASYNC MODE ( 1 thread ) ############################################## # step one gear up - just one thread for now. @@ -457,30 +457,29 @@ def test_base(self): del(p) time.sleep(0.05) assert len(threading.enumerate()) == num_threads - + p = ThreadPool(1) assert len(threading.enumerate()) == num_threads + 1 - + # here we go self._assert_single_task(p, True) - - - + + + # SINGLE TASK ASYNC MODE ( 2 threads ) ###################################### # two threads to compete for a single task p.set_size(2) self._assert_single_task(p, True) - + # real stress test- should be native on every dual-core cpu with 2 hardware # threads per core p.set_size(4) self._assert_single_task(p, True) - - + + # DEPENDENT TASK ASYNC MODE ########################### self._assert_async_dependent_tasks(p) - - print("Done with everything", file=sys.stderr) - + + sys.stderr.write("Done with everything\n") diff --git a/async/test/test_thread.py b/async/test/test_thread.py index e5e5158..b575a52 100644 --- a/async/test/test_thread.py +++ b/async/test/test_thread.py @@ -6,36 +6,40 @@ """ Test thead classes and functions""" from .lib import * from async.thread import * -from queue import Queue +try: + from queue import Queue +except ImportError: + from Queue import Queue + import time class TestWorker(WorkerThread): def __init__(self, *args, **kwargs): super(TestWorker, self).__init__(*args, **kwargs) self.reset() - + def fun(self, arg): self.called = True - self.arg = arg + self.arg = arg return True - + def make_assertion(self): assert self.called assert self.arg self.reset() - + def reset(self): self.called = False self.arg = None - + class TestThreads(TestBase): - + @terminate_threads def test_worker_thread(self): worker = TestWorker() assert isinstance(worker.start(), WorkerThread) - + # test different method types standalone_func = lambda *args, **kwargs: worker.fun(*args, **kwargs) for function in (TestWorker.fun, worker.fun, standalone_func): @@ -43,6 +47,5 @@ def test_worker_thread(self): time.sleep(0.01) worker.make_assertion() # END for each function type - + worker.stop_and_join() - diff --git a/async/thread.py b/async/thread.py index 7aacf6d..e85eb3b 100644 --- a/async/thread.py +++ b/async/thread.py @@ -7,13 +7,16 @@ __docformat__ = "restructuredtext" import threading import inspect -import queue +try: + import queue +except ImportError: + import Queue as queue import sys -__all__ = ('do_terminate_threads', 'terminate_threads', 'TerminatableThread', - 'WorkerThread') - +__all__ = ('do_terminate_threads', 'terminate_threads', 'TerminatableThread', + 'WorkerThread') + #{ Decorators @@ -39,36 +42,36 @@ def wrapper(*args, **kwargs): finally: do_terminate_threads(set(threading.enumerate()) - cur_threads) # END finally shutdown threads - # END wrapper + # END wrapper wrapper.__name__ = func.__name__ return wrapper #} END decorators #{ Classes - + class TerminatableThread(threading.Thread): """A simple thread able to terminate itself on behalf of the user. - + Terminate a thread as follows: - + t.stop_and_join() - - Derived classes call _should_terminate() to determine whether they should + + Derived classes call _should_terminate() to determine whether they should abort gracefully """ __slots__ = '_terminate' - + def __init__(self): super(TerminatableThread, self).__init__() self._terminate = False - - + + #{ Subclass Interface def _should_terminate(self): """:return: True if this thread should terminate its operation immediately""" return self._terminate - + def _terminated(self): """Called once the thread terminated. Its called in the main thread and may perform cleanup operations""" @@ -78,15 +81,15 @@ def start(self): """Start the thread and return self""" super(TerminatableThread, self).start() return self - + #} END subclass interface - + #{ Interface def schedule_termination(self): """Schedule this thread to be terminated as soon as possible. :note: this method does not block.""" self._terminate = True - + def stop_and_join(self): """Ask the thread to stop its operation and wait for it to terminate :note: Depending on the implenetation, this might block a moment""" @@ -94,59 +97,59 @@ def stop_and_join(self): self.join() self._terminated() #} END interface - - + + class StopProcessing(Exception): """If thrown in a function processed by a WorkerThread, it will terminate""" - + class WorkerThread(TerminatableThread): """ This base allows to call functions on class instances natively. - As it is meant to work with a pool, the result of the call must be + As it is meant to work with a pool, the result of the call must be handled by the callee. - The thread runs forever unless it receives the terminate signal using + The thread runs forever unless it receives the terminate signal using its task queue. - + Tasks could be anything, but should usually be class methods and arguments to allow the following: - + inq = Queue() w = WorkerThread(inq) w.start() inq.put((WorkerThread., args, kwargs)) - + finally we call quit to terminate asap. - + alternatively, you can make a call more intuitively - the output is the output queue allowing you to get the result right away or later w.call(arg, kwarg='value').get() - + inq.put(WorkerThread.quit) w.join() - + You may provide the following tuples as task: t[0] = class method, function or instance method t[1] = optional, tuple or list of arguments to pass to the routine t[2] = optional, dictionary of keyword arguments to pass to the routine """ __slots__ = ('inq') - - - # define how often we should check for a shutdown request in case our + + + # define how often we should check for a shutdown request in case our # taskqueue is empty shutdown_check_time_s = 0.5 - + def __init__(self, inq = None): super(WorkerThread, self).__init__() self.inq = inq if inq is None: self.inq = queue.Queue() - + @classmethod def stop(cls, *args): """If send via the inq of the thread, it will stop once it processed the function""" raise StopProcessing - + def run(self): """Process input tasks until we receive the quit signal""" gettask = self.inq.get @@ -154,19 +157,19 @@ def run(self): if self._should_terminate(): break # END check for stop request - - # note: during shutdown, this turns None in the middle of waiting - # for an item to be put onto it - we can't du anything about it - - # even if we catch everything and break gracefully, the parent + + # note: during shutdown, this turns None in the middle of waiting + # for an item to be put onto it - we can't du anything about it - + # even if we catch everything and break gracefully, the parent # call will think we failed with an empty exception. # Hence we just don't do anything about it. Alternatively - # we could override the start method to get our own bootstrapping, + # we could override the start method to get our own bootstrapping, # which would mean repeating plenty of code in of the threading module. tasktuple = gettask() - + # needing exactly one function, and one arg routine, arg = tasktuple - + try: try: rval = None @@ -184,7 +187,7 @@ def run(self): # END make routine call finally: # make sure we delete the routine to release the reference as soon - # as possible. Otherwise objects might not be destroyed + # as possible. Otherwise objects might not be destroyed # while we are waiting del(routine) del(tasktuple) @@ -192,19 +195,19 @@ def run(self): break except Exception as e: sys.stderr.write("%s: Task %s raised unhandled exception: %s - this really shouldn't happen !\n" % (self.getName(), str(tasktuple), str(e))) - continue # just continue + continue # just continue # END routine exception handling - + # END handle routine release # END endless loop - + def stop_and_join(self): - """Send stop message to ourselves - we don't block, the thread will terminate + """Send stop message to ourselves - we don't block, the thread will terminate once it has finished processing its input queue to receive our termination event""" - # DONT call superclass as it will try to join - join's don't work for + # DONT call superclass as it will try to join - join's don't work for # some reason, as python apparently doesn't switch threads (so often) - # while waiting ... I don't know, but the threads respond properly, + # while waiting ... I don't know, but the threads respond properly, # but only if dear python switches to them self.inq.put((self.stop, None)) #} END classes diff --git a/async/util.py b/async/util.py index 16955ff..7bf7abc 100644 --- a/async/util.py +++ b/async/util.py @@ -7,19 +7,20 @@ from threading import ( Lock, _allocate_lock, - _sleep, _time, - ) +) -from queue import ( - Empty, - ) +try: + from queue import Empty +except ImportError: + from Queue import Empty from collections import deque import sys import os +import time -#{ Routines +#{ Routines def cpu_count(): """:return:number of CPUs in the system @@ -35,12 +36,12 @@ def cpu_count(): except (ValueError, KeyError, OSError, AttributeError): pass # END exception handling - + if num == 0: raise NotImplementedError('cannot determine number of cpus') - + return num - + #} END routines @@ -48,13 +49,13 @@ def cpu_count(): class DummyLock(object): """An object providing a do-nothing lock interface for use in sync mode""" __slots__ = tuple() - + def acquire(self): pass - + def release(self): pass - + class SyncQueue(deque): """Adapter to allow using a deque like a queue, without locking""" @@ -67,23 +68,23 @@ def get(self, block=True, timeout=None): def empty(self): return len(self) == 0 - + def set_writable(self, state): pass - + def writable(self): return True def put(self, item, block=True, timeout=None): self.append(item) - + class HSCondition(deque): - """Cleaned up code of the original condition object in order + """Cleaned up code of the original condition object in order to make it run and respond faster.""" __slots__ = ("_lock") delay = 0.0002 # reduces wait times, but increases overhead - + def __init__(self, lock=None): if lock is None: lock = Lock() @@ -91,7 +92,7 @@ def __init__(self, lock=None): def release(self): self._lock.release() - + def acquire(self, block=None): if block is None: self._lock.acquire() @@ -102,8 +103,8 @@ def wait(self, timeout=None): waiter = _allocate_lock() waiter.acquire() # get it the first time, no blocking self.append(waiter) - - + + try: # restore state no matter what (e.g., KeyboardInterrupt) # now we block, as we hold the lock already @@ -112,11 +113,11 @@ def wait(self, timeout=None): if timeout is None: waiter.acquire() else: - # Balancing act: We can't afford a pure busy loop, because of the + # Balancing act: We can't afford a pure busy loop, because of the # GIL, so we have to sleep # We try to sleep only tiny amounts of time though to be very responsive - # NOTE: this branch is not used by the async system anyway, but - # will be hit when the user reads with timeout + # NOTE: this branch is not used by the async system anyway, but + # will be hit when the user reads with timeout endtime = _time() + timeout delay = self.delay acquire = waiter.acquire @@ -130,14 +131,14 @@ def wait(self, timeout=None): # this makes 4 threads working as good as two, but of course # it causes more frequent micro-sleeping #delay = min(delay * 2, remaining, .05) - _sleep(delay) + time.sleep(delay) # END endless loop if not gotit: try: self.remove(waiter) except AttributeError: # handle python 2.4 - actually this should be made thread-safe - # but lets see ... + # but lets see ... try: # lets hope we pop the right one - we don't loop over it # yet-we just keep minimal compatability with py 2.4 @@ -150,13 +151,13 @@ def wait(self, timeout=None): pass # END didn't ever get it finally: - # reacquire the lock + # reacquire the lock self._lock.acquire() # END assure release lock - + def notify(self, n=1): - """Its vital that this method is threadsafe - we absolutely have to - get a lock at the beginning of this method to be sure we get the + """Its vital that this method is threadsafe - we absolutely have to + get a lock at the beginning of this method to be sure we get the correct amount of waiters back. If we bail out, although a waiter is about to be added, it will miss its wakeup notification, and block forever (possibly)""" @@ -177,10 +178,10 @@ def notify(self, n=1): finally: self._lock.release() # END assure lock is released - + def notify_all(self): self.notify(len(self)) - + class ReadOnly(Exception): """Thrown when trying to write to a read-only queue""" @@ -191,12 +192,12 @@ class AsyncQueue(deque): that there is nothing more to get here. All default-queue code was cleaned up for performance.""" __slots__ = ('mutex', 'not_empty', '_writable') - + def __init__(self, maxsize=0): self.mutex = Lock() self.not_empty = HSCondition(self.mutex) self._writable = True - + def qsize(self): self.mutex.acquire() try: @@ -237,11 +238,11 @@ def empty(self): def put(self, item, block=True, timeout=None): self.mutex.acquire() # NOTE: we explicitly do NOT check for our writable state - # Its just used as a notification signal, and we need to be able + # Its just used as a notification signal, and we need to be able # to continue writing to prevent threads ( easily ) from failing # to write their computed results, which we want in fact # NO: we want them to fail and stop processing, as the one who caused - # the channel to close had a reason and wants the threads to + # the channel to close had a reason and wants the threads to # stop on the task as soon as possible if not self._writable: self.mutex.release() @@ -250,7 +251,7 @@ def put(self, item, block=True, timeout=None): self.append(item) self.mutex.release() self.not_empty.notify() - + def get(self, block=True, timeout=None): self.mutex.acquire() try: @@ -267,7 +268,7 @@ def get(self, block=True, timeout=None): self.not_empty.wait(remaining) # END handle timeout mode # END handle block - + # can throw if we woke up because we are not writable anymore try: return self.popleft()