Skip to content

Commit 1762973

Browse files
HDDS-1512. Implement DoubleBuffer in OzoneManager.
1 parent a85451c commit 1762973

File tree

10 files changed

+974
-0
lines changed

10 files changed

+974
-0
lines changed
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.ozone.om.ratis;
20+
21+
import java.io.IOException;
22+
import java.util.Queue;
23+
import java.util.concurrent.ConcurrentLinkedQueue;
24+
import java.util.concurrent.atomic.AtomicLong;
25+
26+
import org.apache.hadoop.ozone.om.OMMetadataManager;
27+
import org.apache.hadoop.ozone.om.ratis.helpers.DoubleBufferEntry;
28+
import org.apache.hadoop.ozone.om.response.OMClientResponse;
29+
import org.apache.hadoop.util.Daemon;
30+
import org.apache.hadoop.utils.db.BatchOperation;
31+
32+
import org.apache.ratis.util.ExitUtils;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
/**
37+
* This class implements DoubleBuffer implementation of OMClientResponse's. In
38+
* DoubleBuffer it has 2 buffers one is currentBuffer and other is
39+
* readyBuffer. The current OM requests will be always added to currentBuffer.
40+
* Flush thread will be running in background, it check's if currentBuffer has
41+
* any entries, it swaps the buffer and creates a batch and commit to DB.
42+
* Adding OM request to doubleBuffer and swap of buffer are synchronized
43+
* methods.
44+
*
45+
*/
46+
public class OzoneManagerDoubleBuffer {
47+
48+
private static final Logger LOG =
49+
LoggerFactory.getLogger(OzoneManagerDoubleBuffer.class.getName());
50+
51+
// Taken unbounded queue, if sync thread is taking too long time, we
52+
// might end up taking huge memory to add entries to the buffer.
53+
// TODO: We can avoid this using unbounded queue and use queue with
54+
// capacity, if queue is full we can wait for sync to be completed to
55+
// add entries. But in this also we might block rpc handlers, as we
56+
// clear entries after sync. Or we can come up with a good approach to
57+
// solve this.
58+
private Queue<DoubleBufferEntry<OMClientResponse>> currentBuffer;
59+
private Queue<DoubleBufferEntry<OMClientResponse>> readyBuffer;
60+
61+
private Daemon daemon;
62+
private final OMMetadataManager omMetadataManager;
63+
private final AtomicLong flushedTransactionCount = new AtomicLong(0);
64+
private final AtomicLong flushIterations = new AtomicLong(0);
65+
private volatile boolean isRunning;
66+
67+
68+
public OzoneManagerDoubleBuffer(OMMetadataManager omMetadataManager) {
69+
this.currentBuffer = new ConcurrentLinkedQueue<>();
70+
this.readyBuffer = new ConcurrentLinkedQueue<>();
71+
this.omMetadataManager = omMetadataManager;
72+
73+
isRunning = true;
74+
// Daemon thread which runs in back ground and flushes transactions to DB.
75+
daemon = new Daemon(this::flushTransactions);
76+
daemon.setName("OMDoubleBufferFlushThread");
77+
daemon.start();
78+
79+
}
80+
81+
/**
82+
* Runs in a background thread and batches the transaction in currentBuffer
83+
* and commit to DB.
84+
*/
85+
private void flushTransactions() {
86+
while(isRunning) {
87+
try {
88+
if (canFlush()) {
89+
setReadyBuffer();
90+
final BatchOperation batchOperation = omMetadataManager.getStore()
91+
.initBatchOperation();
92+
93+
readyBuffer.iterator().forEachRemaining((entry) -> {
94+
try {
95+
entry.getResponse().addToDBBatch(omMetadataManager,
96+
batchOperation);
97+
} catch (IOException ex) {
98+
// During Adding to RocksDB batch entry got an exception.
99+
// We should terminate the OM.
100+
terminate(ex);
101+
}
102+
});
103+
104+
omMetadataManager.getStore().commitBatchOperation(batchOperation);
105+
int flushedTransactionsSize = readyBuffer.size();
106+
flushedTransactionCount.addAndGet(flushedTransactionsSize);
107+
flushIterations.incrementAndGet();
108+
109+
LOG.debug("Sync Iteration {} flushed transactions in this " +
110+
"iteration{}", flushIterations.get(),
111+
flushedTransactionsSize);
112+
readyBuffer.clear();
113+
// TODO: update the last updated index in OzoneManagerStateMachine.
114+
}
115+
} catch (InterruptedException ex) {
116+
Thread.currentThread().interrupt();
117+
if (isRunning) {
118+
final String message = "OMDoubleBuffer flush thread " +
119+
Thread.currentThread().getName() + " encountered Interrupted " +
120+
"exception while running";
121+
ExitUtils.terminate(1, message, ex, LOG);
122+
} else {
123+
LOG.info("OMDoubleBuffer flush thread " +
124+
Thread.currentThread().getName() + " is interrupted and will " +
125+
"exit. {}", Thread.currentThread().getName());
126+
}
127+
} catch (IOException ex) {
128+
terminate(ex);
129+
} catch (Throwable t) {
130+
final String s = "OMDoubleBuffer flush thread" +
131+
Thread.currentThread().getName() + "encountered Throwable error";
132+
ExitUtils.terminate(2, s, t, LOG);
133+
}
134+
}
135+
}
136+
137+
/**
138+
* Stop OM DoubleBuffer flush thread.
139+
*/
140+
public synchronized void stop() {
141+
if (isRunning) {
142+
LOG.info("Stopping OMDoubleBuffer flush thread");
143+
isRunning = false;
144+
daemon.interrupt();
145+
} else {
146+
LOG.info("OMDoubleBuffer flush thread is not running.");
147+
}
148+
149+
}
150+
151+
private void terminate(IOException ex) {
152+
String message = "During flush to DB encountered error in " +
153+
"OMDoubleBuffer flush thread " + Thread.currentThread().getName();
154+
ExitUtils.terminate(1, message, ex, LOG);
155+
}
156+
157+
/**
158+
* Returns the flushed transaction count to OM DB.
159+
* @return flushedTransactionCount
160+
*/
161+
public long getFlushedTransactionCount() {
162+
return flushedTransactionCount.get();
163+
}
164+
165+
/**
166+
* Returns total number of flush iterations run by sync thread.
167+
* @return flushIterations
168+
*/
169+
public long getFlushIterations() {
170+
return flushIterations.get();
171+
}
172+
173+
/**
174+
* Add OmResponseBufferEntry to buffer.
175+
* @param response
176+
* @param transactionIndex
177+
*/
178+
public synchronized void add(OMClientResponse response,
179+
long transactionIndex) {
180+
currentBuffer.add(new DoubleBufferEntry<>(transactionIndex, response));
181+
notify();
182+
}
183+
184+
/**
185+
* Check can we flush transactions or not. This method wait's until
186+
* currentBuffer size is greater than zero, once currentBuffer size is
187+
* greater than zero it gets notify signal, and it returns true
188+
* indicating that we are ready to flush.
189+
*
190+
* @return boolean
191+
*/
192+
private synchronized boolean canFlush() throws InterruptedException {
193+
// When transactions are added to buffer it notifies, then we check if
194+
// currentBuffer size once and return from this method.
195+
while (currentBuffer.size() == 0) {
196+
wait(Long.MAX_VALUE);
197+
}
198+
return true;
199+
}
200+
201+
/**
202+
* Prepares the readyBuffer which is used by sync thread to flush
203+
* transactions to OM DB. This method swaps the currentBuffer and readyBuffer.
204+
*/
205+
private synchronized void setReadyBuffer() {
206+
Queue<DoubleBufferEntry<OMClientResponse>> temp = currentBuffer;
207+
currentBuffer = readyBuffer;
208+
readyBuffer = temp;
209+
}
210+
211+
}
212+
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.ozone.om.ratis.helpers;
20+
21+
import org.apache.hadoop.ozone.om.response.OMClientResponse;
22+
23+
/**
24+
* Entry in OzoneManagerDouble Buffer.
25+
* @param <Response>
26+
*/
27+
public class DoubleBufferEntry<Response extends OMClientResponse> {
28+
29+
private long trxLogIndex;
30+
private Response response;
31+
32+
public DoubleBufferEntry(long trxLogIndex, Response response) {
33+
this.trxLogIndex = trxLogIndex;
34+
this.response = response;
35+
}
36+
37+
public long getTrxLogIndex() {
38+
return trxLogIndex;
39+
}
40+
41+
public Response getResponse() {
42+
return response;
43+
}
44+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with this
4+
* work for additional information regarding copyright ownership. The ASF
5+
* licenses this file to you under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
* <p>
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
* <p>
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
13+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
* License for the specific language governing permissions and limitations under
15+
* the License.
16+
*/
17+
/**
18+
* package which contains helper classes for each OM request response.
19+
*/
20+
package org.apache.hadoop.ozone.om.ratis.helpers;
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.ozone.om.response;
20+
21+
import java.io.IOException;
22+
23+
import org.apache.hadoop.ozone.om.OMMetadataManager;
24+
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
25+
import org.apache.hadoop.utils.db.BatchOperation;
26+
27+
/**
28+
* Response for CreateBucket request.
29+
*/
30+
public final class OMBucketCreateResponse implements OMClientResponse {
31+
32+
private final OmBucketInfo omBucketInfo;
33+
34+
public OMBucketCreateResponse(OmBucketInfo omBucketInfo) {
35+
this.omBucketInfo = omBucketInfo;
36+
}
37+
38+
@Override
39+
public void addToDBBatch(OMMetadataManager omMetadataManager,
40+
BatchOperation batchOperation) throws IOException {
41+
String dbBucketKey =
42+
omMetadataManager.getBucketKey(omBucketInfo.getVolumeName(),
43+
omBucketInfo.getBucketName());
44+
omMetadataManager.getBucketTable().putWithBatch(batchOperation, dbBucketKey,
45+
omBucketInfo);
46+
}
47+
48+
public OmBucketInfo getOmBucketInfo() {
49+
return omBucketInfo;
50+
}
51+
}
52+
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.ozone.om.response;
20+
21+
import java.io.IOException;
22+
23+
import org.apache.hadoop.ozone.om.OMMetadataManager;
24+
import org.apache.hadoop.utils.db.BatchOperation;
25+
26+
/**
27+
* Response for DeleteBucket request.
28+
*/
29+
public final class OMBucketDeleteResponse implements OMClientResponse {
30+
31+
private String volumeName;
32+
private String bucketName;
33+
34+
public OMBucketDeleteResponse(
35+
String volumeName, String bucketName) {
36+
this.volumeName = volumeName;
37+
this.bucketName = bucketName;
38+
}
39+
40+
@Override
41+
public void addToDBBatch(OMMetadataManager omMetadataManager,
42+
BatchOperation batchOperation) throws IOException {
43+
String dbBucketKey =
44+
omMetadataManager.getBucketKey(volumeName, bucketName);
45+
omMetadataManager.getBucketTable().deleteWithBatch(batchOperation,
46+
dbBucketKey);
47+
}
48+
49+
public String getVolumeName() {
50+
return volumeName;
51+
}
52+
53+
public String getBucketName() {
54+
return bucketName;
55+
}
56+
}
57+

0 commit comments

Comments
 (0)