Skip to content

Commit 8c4c653

Browse files
authored
Merge pull request #1122 from andy-slac/concurrent-execution-profiles
Adds one more keyword argument `execution_profile` to the `execute_concurrent` method to pass an execution profile. It is fowarded to `Session.execute_async` call.
1 parent e4e290f commit 8c4c653

File tree

1 file changed

+10
-5
lines changed

1 file changed

+10
-5
lines changed

cassandra/concurrent.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,15 @@
2121
from threading import Condition
2222
import sys
2323

24-
from cassandra.cluster import ResultSet
24+
from cassandra.cluster import ResultSet, EXEC_PROFILE_DEFAULT
2525

2626
import logging
2727
log = logging.getLogger(__name__)
2828

2929

3030
ExecutionResult = namedtuple('ExecutionResult', ['success', 'result_or_exc'])
3131

32-
def execute_concurrent(session, statements_and_parameters, concurrency=100, raise_on_first_error=True, results_generator=False):
32+
def execute_concurrent(session, statements_and_parameters, concurrency=100, raise_on_first_error=True, results_generator=False, execution_profile=EXEC_PROFILE_DEFAULT):
3333
"""
3434
Executes a sequence of (statement, parameters) tuples concurrently. Each
3535
``parameters`` item must be a sequence or :const:`None`.
@@ -56,6 +56,9 @@ def execute_concurrent(session, statements_and_parameters, concurrency=100, rais
5656
footprint is marginal CPU overhead (more thread coordination and sorting out-of-order results
5757
on-the-fly).
5858
59+
`execution_profile` argument is the execution profile to use for this
60+
request, it is passed directly to :meth:`Session.execute_async`.
61+
5962
A sequence of ``ExecutionResult(success, result_or_exc)`` namedtuples is returned
6063
in the same order that the statements were passed in. If ``success`` is :const:`False`,
6164
there was an error executing the statement, and ``result_or_exc`` will be
@@ -90,17 +93,19 @@ def execute_concurrent(session, statements_and_parameters, concurrency=100, rais
9093
if not statements_and_parameters:
9194
return []
9295

93-
executor = ConcurrentExecutorGenResults(session, statements_and_parameters) if results_generator else ConcurrentExecutorListResults(session, statements_and_parameters)
96+
executor = ConcurrentExecutorGenResults(session, statements_and_parameters, execution_profile) \
97+
if results_generator else ConcurrentExecutorListResults(session, statements_and_parameters, execution_profile)
9498
return executor.execute(concurrency, raise_on_first_error)
9599

96100

97101
class _ConcurrentExecutor(object):
98102

99103
max_error_recursion = 100
100104

101-
def __init__(self, session, statements_and_params):
105+
def __init__(self, session, statements_and_params, execution_profile):
102106
self.session = session
103107
self._enum_statements = enumerate(iter(statements_and_params))
108+
self._execution_profile = execution_profile
104109
self._condition = Condition()
105110
self._fail_fast = False
106111
self._results_queue = []
@@ -132,7 +137,7 @@ def _execute_next(self):
132137
def _execute(self, idx, statement, params):
133138
self._exec_depth += 1
134139
try:
135-
future = self.session.execute_async(statement, params, timeout=None)
140+
future = self.session.execute_async(statement, params, timeout=None, execution_profile=self._execution_profile)
136141
args = (future, idx)
137142
future.add_callbacks(
138143
callback=self._on_success, callback_args=args,

0 commit comments

Comments
 (0)