@@ -49,7 +49,7 @@ def __init__(self, max_mutation_bytes=MAX_MUTATION_SIZE, flush_count=FLUSH_COUNT
4949 self ._queue = queue .Queue ()
5050 self .total_mutation_count = 0
5151 self .total_size = 0
52- self .max_row_bytes = max_row_bytes
52+ self .max_mutation_bytes = max_mutation_bytes
5353 self .flush_count = flush_count
5454
5555 def get (self ):
@@ -74,7 +74,7 @@ def full(self):
7474 """Check if the queue is full."""
7575 if (
7676 self .total_mutation_count >= self .flush_count
77- or self .total_size >= self .max_row_bytes
77+ or self .total_size >= self .max_mutation_bytes
7878 ):
7979 return True
8080 return False
@@ -94,7 +94,9 @@ class BatchInfo:
9494
9595class FlowControl (object ):
9696 def __init__ (
97- self , max_mutations = MAX_OUTSTANDING_ELEMENTS , max_row_bytes = MAX_OUTSTANDING_BYTES
97+ self ,
98+ max_mutations = MAX_OUTSTANDING_ELEMENTS ,
99+ max_mutation_bytes = MAX_OUTSTANDING_BYTES ,
98100 ):
99101 """Control the inflight requests. Keep track of the mutations, row bytes and row counts.
100102 As requests to backend are being made, adjust the number of mutations being processed.
@@ -103,22 +105,21 @@ def __init__(
103105 Reopen the flow as requests are finished.
104106 """
105107 self .max_mutations = max_mutations
106- self .max_row_bytes = max_row_bytes
108+ self .max_mutation_bytes = max_mutation_bytes
107109 self .inflight_mutations = 0
108110 self .inflight_size = 0
109- self .inflight_rows_count = 0 # TODO: FOR DEBUG, DELETE before merging
110111 self .event = threading .Event ()
111112 self .event .set ()
112113
113114 def is_blocked (self ):
114115 """Returns True if:
115116 - inflight mutations >= max_mutations, or
116- - inflight bytes size >= max_row_bytes , or
117+ - inflight bytes size >= max_mutation_bytes , or
117118 """
118119
119120 return (
120121 self .inflight_mutations >= self .max_mutations
121- or self .inflight_size >= self .max_row_bytes
122+ or self .inflight_size >= self .max_mutation_bytes
122123 )
123124
124125 def control_flow (self , batch_info ):
@@ -128,8 +129,6 @@ def control_flow(self, batch_info):
128129
129130 self .inflight_mutations += batch_info .mutations_count
130131 self .inflight_size += batch_info .mutations_size
131- self .inflight_rows_count += batch_info .rows_count
132-
133132 self .set_flow_control_status ()
134133 self .wait ()
135134
@@ -157,7 +156,6 @@ def release(self, batch_info):
157156 """
158157 self .inflight_mutations -= batch_info .mutations_count
159158 self .inflight_size -= batch_info .mutations_size
160- self .inflight_rows_count -= batch_info .rows_count
161159 self .set_flow_control_status ()
162160
163161
@@ -204,7 +202,7 @@ def __init__(
204202 flush_interval = 1 ,
205203 ):
206204 self ._rows = _MutationsBatchQueue (
207- max_row_bytes = max_row_bytes , flush_count = flush_count
205+ max_mutation_bytes = max_row_bytes , flush_count = flush_count
208206 )
209207 self .table = table
210208 self ._executor = concurrent .futures .ThreadPoolExecutor ()
@@ -213,7 +211,7 @@ def __init__(
213211 self ._timer .start ()
214212 self .flow_control = FlowControl (
215213 max_mutations = MAX_OUTSTANDING_ELEMENTS ,
216- max_row_bytes = MAX_OUTSTANDING_BYTES ,
214+ max_mutation_bytes = MAX_OUTSTANDING_BYTES ,
217215 )
218216 self .futures_mapping = {}
219217 self .exceptions = queue .Queue ()
@@ -224,7 +222,7 @@ def flush_count(self):
224222
225223 @property
226224 def max_row_bytes (self ):
227- return self ._rows .max_row_bytes
225+ return self ._rows .max_mutation_bytes
228226
229227 def __enter__ (self ):
230228 """Starting the MutationsBatcher as a context manager"""
@@ -325,7 +323,7 @@ def flush_async(self):
325323 rows_count >= self .flush_count
326324 or mutations_size >= self .max_row_bytes
327325 or mutations_count >= self .flow_control .max_mutations
328- or mutations_size >= self .flow_control .max_row_bytes
326+ or mutations_size >= self .flow_control .max_mutation_bytes
329327 or self ._rows .empty () # submit when it reached the end of the queue
330328 ):
331329 self .flow_control .control_flow (batch_info )
0 commit comments