Skip to content

multiprocessing Pool (and context manager) fixes/improvements #1562

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 29, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 13 additions & 52 deletions stdlib/3/multiprocessing/__init__.pyi
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
# Stubs for multiprocessing

from typing import Any, Callable, Iterable, Mapping, Optional, Dict, List, Union, TypeVar
from typing import (
Any, Callable, ContextManager, Iterable, Mapping, Optional, Dict, List,
Union, TypeVar,
)

from logging import Logger
from multiprocessing import pool
from multiprocessing.context import BaseContext
from multiprocessing.managers import SyncManager
from multiprocessing.pool import AsyncResult
Expand All @@ -12,11 +16,9 @@ import queue

_T = TypeVar('_T')

class Lock():
class Lock(ContextManager[Lock]):
def acquire(self, block: bool = ..., timeout: int = ...) -> None: ...
def release(self) -> None: ...
def __enter__(self) -> 'Lock': ...
def __exit__(self, exc_type, exc_value, tb) -> None: ...

class Event(object):
def __init__(self, *, ctx: BaseContext) -> None: ...
Expand All @@ -25,54 +27,13 @@ class Event(object):
def clear(self) -> None: ...
def wait(self, timeout: Optional[int] = ...) -> bool: ...

class Pool():
def __init__(self, processes: Optional[int] = ...,
initializer: Optional[Callable[..., None]] = ...,
initargs: Iterable[Any] = ...,
maxtasksperchild: Optional[int] = ...,
context: Optional[Any] = None) -> None: ...
def apply(self,
func: Callable[..., Any],
args: Iterable[Any] = ...,
kwds: Dict[str, Any]=...) -> Any: ...
def apply_async(self,
func: Callable[..., Any],
args: Iterable[Any] = ...,
kwds: Dict[str, Any] = ...,
callback: Optional[Callable[..., None]] = None,
error_callback: Optional[Callable[[BaseException], None]] = None) -> AsyncResult: ...
def map(self,
func: Callable[..., Any],
iterable: Iterable[Any] = ...,
chunksize: Optional[int] = ...) -> List[Any]: ...
def map_async(self, func: Callable[..., Any],
iterable: Iterable[Any] = ...,
chunksize: Optional[int] = ...,
callback: Optional[Callable[..., None]] = None,
error_callback: Optional[Callable[[BaseException], None]] = None) -> AsyncResult: ...
def imap(self,
func: Callable[..., Any],
iterable: Iterable[Any] = ...,
chunksize: Optional[int] = None) -> Iterable[Any]: ...
def imap_unordered(self,
func: Callable[..., Any],
iterable: Iterable[Any] = ...,
chunksize: Optional[int] = None) -> Iterable[Any]: ...
def starmap(self,
func: Callable[..., Any],
iterable: Iterable[Iterable[Any]] = ...,
chunksize: Optional[int] = None) -> List[Any]: ...
def starmap_async(self,
func: Callable[..., Any],
iterable: Iterable[Iterable[Any]] = ...,
chunksize: Optional[int] = ...,
callback: Optional[Callable[..., None]] = None,
error_callback: Optional[Callable[[BaseException], None]] = None) -> AsyncResult: ...
def close(self) -> None: ...
def terminate(self) -> None: ...
def join(self) -> None: ...
def __enter__(self) -> 'Pool': ...
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
# N.B. This is generated at runtime by partially applying
# multiprocessing.context.BaseContext.Pool, so the two signatures should be
# identical (modulo self).
def Pool(processes: Optional[int] = ...,
initializer: Optional[Callable[..., Any]] = ...,
initargs: Iterable[Any] = ...,
maxtasksperchild: Optional[int] = ...) -> pool.Pool: ...

class Process():
name: str
Expand Down
11 changes: 8 additions & 3 deletions stdlib/3/multiprocessing/context.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from logging import Logger
import multiprocessing
import sys
from typing import Any, Callable, Optional, List, Sequence, Tuple, Type, Union
from typing import (
Any, Callable, Iterable, Optional, List, Sequence, Tuple, Type, Union,
)

class ProcessError(Exception): ...

Expand Down Expand Up @@ -49,13 +51,16 @@ class BaseContext(object):
def JoinableQueue(self, maxsize: int = ...) -> Any: ...
# TODO: change return to SimpleQueue once a stub exists in multiprocessing.queues
def SimpleQueue(self) -> Any: ...
# N.B. This method is partially applied at runtime to generate
# multiprocessing.Pool, so the two signatures should be identical (modulo
# self).
def Pool(
self,
processes: Optional[int] = ...,
initializer: Optional[Callable[..., Any]] = ...,
initargs: Tuple = ...,
initargs: Iterable[Any] = ...,
maxtasksperchild: Optional[int] = ...
) -> multiprocessing.Pool: ...
) -> multiprocessing.pool.Pool: ...
# TODO: typecode_or_type param is a ctype with a base class of _SimpleCData or array.typecode Need to figure out
# how to handle the ctype
# TODO: change return to RawValue once a stub exists in multiprocessing.sharedctypes
Expand Down
7 changes: 3 additions & 4 deletions stdlib/3/multiprocessing/managers.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import queue
import threading
from typing import (
Any, Callable, Dict, Iterable, List, Mapping, Optional, Sequence, TypeVar
Any, Callable, ContextManager, Dict, Iterable, List, Mapping, Optional,
Sequence, TypeVar,
)

_T = TypeVar('_T')
Expand All @@ -16,13 +17,11 @@ class Namespace: ...

_Namespace = Namespace

class BaseManager:
class BaseManager(ContextManager[BaseManager]):
def register(self, typeid: str, callable: Any = ...) -> None: ...
def shutdown(self) -> None: ...
def start(self, initializer: Optional[Callable[..., Any]] = ...,
initargs: Iterable[Any] = ...) -> None: ...
def __enter__(self) -> 'BaseManager': ...
def __exit__(self, exc_type, exc_value, tb) -> None: ...

class SyncManager(BaseManager):
def BoundedSemaphore(self, value: Any = ...) -> threading.BoundedSemaphore: ...
Expand Down
29 changes: 21 additions & 8 deletions stdlib/3/multiprocessing/pool.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,25 @@

# NOTE: These are incomplete!

from typing import Any, Callable, Iterable, Mapping, Optional, Dict, List
from typing import (
Any, Callable, ContextManager, Iterable, Mapping, Optional, Dict, List,
TypeVar,
)

_T = TypeVar('_T', bound='Pool')

class AsyncResult():
def get(self, timeout: float = ...) -> Any: ...
def wait(self, timeout: float = ...) -> None: ...
def ready(self) -> bool: ...
def successful(self) -> bool: ...

class ThreadPool():
def __init__(self, processes: Optional[int] = None,
initializer: Optional[Callable[..., Any]] = None,
initargs: Iterable[Any] = ...) -> None: ...
class Pool(ContextManager[Pool]):
def __init__(self, processes: Optional[int] = ...,
initializer: Optional[Callable[..., None]] = ...,
initargs: Iterable[Any] = ...,
maxtasksperchild: Optional[int] = ...,
context: Optional[Any] = None) -> None: ...
def apply(self,
func: Callable[..., Any],
args: Iterable[Any] = ...,
Expand All @@ -30,7 +37,7 @@ class ThreadPool():
chunksize: Optional[int] = None) -> List[Any]: ...
def map_async(self, func: Callable[..., Any],
iterable: Iterable[Any] = ...,
chunksize: Optional[Optional[int]] = None,
chunksize: Optional[int] = None,
callback: Optional[Callable[..., None]] = None,
error_callback: Optional[Callable[[BaseException], None]] = None) -> AsyncResult: ...
def imap(self,
Expand All @@ -54,5 +61,11 @@ class ThreadPool():
def close(self) -> None: ...
def terminate(self) -> None: ...
def join(self) -> None: ...
def __enter__(self) -> 'ThreadPool': ...
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
def __enter__(self: _T) -> _T: ...


class ThreadPool(Pool, ContextManager[ThreadPool]):

def __init__(self, processes: Optional[int] = None,
initializer: Optional[Callable[..., Any]] = None,
initargs: Iterable[Any] = ...) -> None: ...