|
26 | 26 | import org.apache.hadoop.hdds.HddsUtils; |
27 | 27 | import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; |
28 | 28 |
|
| 29 | +import org.apache.hadoop.hdds.scm.ScmConfigKeys; |
29 | 30 | import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; |
30 | 31 | import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; |
31 | 32 | import org.apache.hadoop.ozone.OzoneConfigKeys; |
|
79 | 80 | import java.util.concurrent.ConcurrentHashMap; |
80 | 81 | import java.util.concurrent.ThreadPoolExecutor; |
81 | 82 | import java.util.concurrent.ExecutorService; |
| 83 | +import java.util.concurrent.Semaphore; |
82 | 84 | import java.util.concurrent.TimeUnit; |
83 | 85 | import java.util.concurrent.ExecutionException; |
84 | 86 | import java.util.stream.Collectors; |
@@ -146,6 +148,8 @@ public class ContainerStateMachine extends BaseStateMachine { |
146 | 148 | private final Cache<Long, ByteString> stateMachineDataCache; |
147 | 149 | private final boolean isBlockTokenEnabled; |
148 | 150 | private final TokenVerifier tokenVerifier; |
| 151 | + |
| 152 | + private final Semaphore applyTransactionSemaphore; |
149 | 153 | /** |
150 | 154 | * CSM metrics. |
151 | 155 | */ |
@@ -175,6 +179,12 @@ public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher, |
175 | 179 | final int numContainerOpExecutors = conf.getInt( |
176 | 180 | OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_KEY, |
177 | 181 | OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_DEFAULT); |
| 182 | + int maxPendingApplyTransactions = conf.getInt( |
| 183 | + ScmConfigKeys. |
| 184 | + DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TRANSACTIONS, |
| 185 | + ScmConfigKeys. |
| 186 | + DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TRANSACTIONS_DEFAULT); |
| 187 | + applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions); |
178 | 188 | this.executors = new ExecutorService[numContainerOpExecutors]; |
179 | 189 | for (int i = 0; i < numContainerOpExecutors; i++) { |
180 | 190 | final int index = i; |
@@ -626,6 +636,7 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) { |
626 | 636 | .setLogIndex(index); |
627 | 637 |
|
628 | 638 | try { |
| 639 | + applyTransactionSemaphore.acquire(); |
629 | 640 | metrics.incNumApplyTransactionsOps(); |
630 | 641 | ContainerCommandRequestProto requestProto = |
631 | 642 | getContainerCommandRequestProto( |
@@ -663,9 +674,9 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) { |
663 | 674 | requestProto.getWriteChunk().getChunkData().getLen()); |
664 | 675 | } |
665 | 676 | updateLastApplied(); |
666 | | - }); |
| 677 | + }).whenComplete((r, t) -> applyTransactionSemaphore.release()); |
667 | 678 | return future; |
668 | | - } catch (IOException e) { |
| 679 | + } catch (IOException | InterruptedException e) { |
669 | 680 | metrics.incNumApplyTransactionsFails(); |
670 | 681 | return completeExceptionally(e); |
671 | 682 | } |
|
0 commit comments