diff --git a/cassandra/concurrent.py b/cassandra/concurrent.py index a8bddcbdab..0228f297fe 100644 --- a/cassandra/concurrent.py +++ b/cassandra/concurrent.py @@ -21,7 +21,7 @@ from threading import Condition import sys -from cassandra.cluster import ResultSet +from cassandra.cluster import ResultSet, EXEC_PROFILE_DEFAULT import logging log = logging.getLogger(__name__) @@ -29,7 +29,7 @@ ExecutionResult = namedtuple('ExecutionResult', ['success', 'result_or_exc']) -def execute_concurrent(session, statements_and_parameters, concurrency=100, raise_on_first_error=True, results_generator=False): +def execute_concurrent(session, statements_and_parameters, concurrency=100, raise_on_first_error=True, results_generator=False, execution_profile=EXEC_PROFILE_DEFAULT): """ Executes a sequence of (statement, parameters) tuples concurrently. Each ``parameters`` item must be a sequence or :const:`None`. @@ -56,6 +56,9 @@ def execute_concurrent(session, statements_and_parameters, concurrency=100, rais footprint is marginal CPU overhead (more thread coordination and sorting out-of-order results on-the-fly). + `execution_profile` argument is the execution profile to use for this + request, it is passed directly to :meth:`Session.execute_async`. + A sequence of ``ExecutionResult(success, result_or_exc)`` namedtuples is returned in the same order that the statements were passed in. If ``success`` is :const:`False`, there was an error executing the statement, and ``result_or_exc`` will be @@ -90,7 +93,8 @@ def execute_concurrent(session, statements_and_parameters, concurrency=100, rais if not statements_and_parameters: return [] - executor = ConcurrentExecutorGenResults(session, statements_and_parameters) if results_generator else ConcurrentExecutorListResults(session, statements_and_parameters) + executor = ConcurrentExecutorGenResults(session, statements_and_parameters, execution_profile) \ + if results_generator else ConcurrentExecutorListResults(session, statements_and_parameters, execution_profile) return executor.execute(concurrency, raise_on_first_error) @@ -98,9 +102,10 @@ class _ConcurrentExecutor(object): max_error_recursion = 100 - def __init__(self, session, statements_and_params): + def __init__(self, session, statements_and_params, execution_profile): self.session = session self._enum_statements = enumerate(iter(statements_and_params)) + self._execution_profile = execution_profile self._condition = Condition() self._fail_fast = False self._results_queue = [] @@ -132,7 +137,7 @@ def _execute_next(self): def _execute(self, idx, statement, params): self._exec_depth += 1 try: - future = self.session.execute_async(statement, params, timeout=None) + future = self.session.execute_async(statement, params, timeout=None, execution_profile=self._execution_profile) args = (future, idx) future.add_callbacks( callback=self._on_success, callback_args=args,