threaded is a set of decorators, which wrap functions in:
- concurrent.futures.ThreadPool
- threading.Thread
- asyncio.Task in Python 3.
Why? Because copy-paste of loop.create_task, threading.Thread and thread_pool.submit is boring, especially if target functions is used by this way only.
Pros:
- Free software: Apache license
- Open Source: https://github.com/python-useful-helpers/threaded
- PyPI packaged: https://pypi.python.org/pypi/threaded
- Tested: see bages on top
Decorators:
- ThreadPooled - native
concurrent.futures.ThreadPool
. - threadpooled is alias for ThreadPooled.
- Threaded - wrap in
threading.Thread
. - threaded is alias for Threaded.
- AsyncIOTask - wrap in
asyncio.Task
. Uses the same API, as ThreadPooled. - asynciotask is alias for AsyncIOTask.
Mostly it is required decorator: submit function to ThreadPoolExecutor on call.
threaded.ThreadPooled.configure(max_workers=3)
Note
By default, if executor is not configured - it configures with default parameters: max_workers=CPU_COUNT * 5
@threaded.ThreadPooled
def func():
pass
concurrent.futures.wait([func()])
Usage with asyncio:
Note
if loop_getter is not callable, loop_getter_need_context is ignored.
loop = asyncio.get_event_loop()
@threaded.ThreadPooled(loop_getter=loop, loop_getter_need_context=False)
def func():
pass
loop.run_until_complete(asyncio.wait_for(func(), timeout))
Python 3.5+ usage with asyncio and loop extraction from call arguments:
loop_getter = lambda tgt_loop: tgt_loop
@threaded.ThreadPooled(loop_getter=loop_getter, loop_getter_need_context=True) # loop_getter_need_context is required
def func(*args, **kwargs):
pass
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(func(loop), timeout))
During application shutdown, pool can be stopped (while it will be recreated automatically, if some component will request).
threaded.ThreadPooled.shutdown()
Classic threading.Thread
. Useful for running until close and self-closing threads without return.
Usage example:
@threaded.Threaded
def func(*args, **kwargs):
pass
thread = func()
thread.start()
thread.join()
Without arguments, thread name will use pattern: 'Threaded: ' + func.__name__
Note
If func.__name__ is not accessible, str(hash(func)) will be used instead.
Override name can be don via corresponding argument:
@threaded.Threaded(name='Function in thread')
def func(*args, **kwargs):
pass
Thread can be daemonized automatically:
@threaded.Threaded(daemon=True)
def func(*args, **kwargs):
pass
Also, if no any addition manipulations expected before thread start, it can be started automatically before return:
@threaded.Threaded(started=True)
def func(*args, **kwargs):
pass
Wrap in asyncio.Task
.
usage with asyncio:
@threaded.AsyncIOTask
def func():
pass
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(func(), timeout))
Provide event loop directly:
Note
if loop_getter is not callable, loop_getter_need_context is ignored.
loop = asyncio.get_event_loop()
@threaded.AsyncIOTask(loop_getter=loop)
def func():
pass
loop.run_until_complete(asyncio.wait_for(func(), timeout))
Usage with loop extraction from call arguments:
loop_getter = lambda tgt_loop: tgt_loop
@threaded.AsyncIOTask(loop_getter=loop_getter, loop_getter_need_context=True)
def func(*args, **kwargs):
pass
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(func(loop), timeout))
The main test mechanism for the package threaded is using tox. Available environments can be collected via tox -l
For code checking several CI systems is used in parallel:
- GitHub actions: is used for checking: PEP8, pylint, bandit, installation possibility and unit tests.
- coveralls: is used for coverage display.
GitHub actions: is used for package delivery on PyPI.