Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions cassandra/concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@
from threading import Condition
import sys

from cassandra.cluster import ResultSet
from cassandra.cluster import ResultSet, EXEC_PROFILE_DEFAULT

import logging
log = logging.getLogger(__name__)


ExecutionResult = namedtuple('ExecutionResult', ['success', 'result_or_exc'])

def execute_concurrent(session, statements_and_parameters, concurrency=100, raise_on_first_error=True, results_generator=False):
def execute_concurrent(session, statements_and_parameters, concurrency=100, raise_on_first_error=True, results_generator=False, execution_profile=EXEC_PROFILE_DEFAULT):
"""
Executes a sequence of (statement, parameters) tuples concurrently. Each
``parameters`` item must be a sequence or :const:`None`.
Expand All @@ -56,6 +56,9 @@ def execute_concurrent(session, statements_and_parameters, concurrency=100, rais
footprint is marginal CPU overhead (more thread coordination and sorting out-of-order results
on-the-fly).

`execution_profile` argument is the execution profile to use for this
request, it is passed directly to :meth:`Session.execute_async`.

A sequence of ``ExecutionResult(success, result_or_exc)`` namedtuples is returned
in the same order that the statements were passed in. If ``success`` is :const:`False`,
there was an error executing the statement, and ``result_or_exc`` will be
Expand Down Expand Up @@ -90,17 +93,19 @@ def execute_concurrent(session, statements_and_parameters, concurrency=100, rais
if not statements_and_parameters:
return []

executor = ConcurrentExecutorGenResults(session, statements_and_parameters) if results_generator else ConcurrentExecutorListResults(session, statements_and_parameters)
executor = ConcurrentExecutorGenResults(session, statements_and_parameters, execution_profile) \
if results_generator else ConcurrentExecutorListResults(session, statements_and_parameters, execution_profile)
return executor.execute(concurrency, raise_on_first_error)


class _ConcurrentExecutor(object):

max_error_recursion = 100

def __init__(self, session, statements_and_params):
def __init__(self, session, statements_and_params, execution_profile):
self.session = session
self._enum_statements = enumerate(iter(statements_and_params))
self._execution_profile = execution_profile
self._condition = Condition()
self._fail_fast = False
self._results_queue = []
Expand Down Expand Up @@ -132,7 +137,7 @@ def _execute_next(self):
def _execute(self, idx, statement, params):
self._exec_depth += 1
try:
future = self.session.execute_async(statement, params, timeout=None)
future = self.session.execute_async(statement, params, timeout=None, execution_profile=self._execution_profile)
args = (future, idx)
future.add_callbacks(
callback=self._on_success, callback_args=args,
Expand Down