Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.om.ratis;

import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.ratis.helpers.DoubleBufferEntry;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.utils.db.BatchOperation;

import org.apache.ratis.util.ExitUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class implements DoubleBuffer implementation of OMClientResponse's. In
* DoubleBuffer it has 2 buffers one is currentBuffer and other is
* readyBuffer. The current OM requests will be always added to currentBuffer.
* Flush thread will be running in background, it check's if currentBuffer has
* any entries, it swaps the buffer and creates a batch and commit to DB.
* Adding OM request to doubleBuffer and swap of buffer are synchronized
* methods.
*
*/
public class OzoneManagerDoubleBuffer {

private static final Logger LOG =
LoggerFactory.getLogger(OzoneManagerDoubleBuffer.class.getName());

// Taken unbounded queue, if sync thread is taking too long time, we
// might end up taking huge memory to add entries to the buffer.
// TODO: We can avoid this using unbounded queue and use queue with
// capacity, if queue is full we can wait for sync to be completed to
// add entries. But in this also we might block rpc handlers, as we
// clear entries after sync. Or we can come up with a good approach to
// solve this.
private Queue<DoubleBufferEntry<OMClientResponse>> currentBuffer;
private Queue<DoubleBufferEntry<OMClientResponse>> readyBuffer;

private Daemon daemon;
private final OMMetadataManager omMetadataManager;
private final AtomicLong flushedTransactionCount = new AtomicLong(0);
private final AtomicLong flushIterations = new AtomicLong(0);
private volatile boolean isRunning;


public OzoneManagerDoubleBuffer(OMMetadataManager omMetadataManager) {
this.currentBuffer = new ConcurrentLinkedQueue<>();
this.readyBuffer = new ConcurrentLinkedQueue<>();
this.omMetadataManager = omMetadataManager;

isRunning = true;
// Daemon thread which runs in back ground and flushes transactions to DB.
daemon = new Daemon(this::flushTransactions);
daemon.setName("OMDoubleBufferFlushThread");
daemon.start();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to separate the daemon thread from the OM double buffer, so that this class will be more flexible to use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your suggestion here is to Make Deamon thread logic in a separate class, and instantiate that class in OMDoubleBuffer and start the thread in OMDoubleBuffer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I mean not to start flushTransactions thread in OMDoubleBuffer. It's would be better to let the caller of function flushTransactions outside of the class OMDoubleBuffer. How the flush transactions behavior should be triggered by outside not by itself I think.

Copy link
Contributor Author

@bharatviswa504 bharatviswa504 May 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought process of doing this way is, background flush will be continuously happening if currentBuffer has any entries so that we don't wait for the buffer to be full. So, that when a restart happens we don't need to apply the transactions again to OM DB. And in this way we keep disks busy.

In the other way as suggested if we do, when the buffer is full, swap and start flush, and keep further transactions adding to the currentBuffer, but if flush is taking much time, and this buffer happened to be full, we should wait until the first flush to be completed, and we might block all further transactions.


}

/**
* Runs in a background thread and batches the transaction in currentBuffer
* and commit to DB.
*/
private void flushTransactions() {
while(isRunning) {
try {
if (canFlush()) {
setReadyBuffer();
final BatchOperation batchOperation = omMetadataManager.getStore()
.initBatchOperation();

readyBuffer.iterator().forEachRemaining((entry) -> {
try {
entry.getResponse().addToDBBatch(omMetadataManager,
batchOperation);
} catch (IOException ex) {
// During Adding to RocksDB batch entry got an exception.
// We should terminate the OM.
terminate(ex);
}
});

omMetadataManager.getStore().commitBatchOperation(batchOperation);
int flushedTransactionsSize = readyBuffer.size();
flushedTransactionCount.addAndGet(flushedTransactionsSize);
flushIterations.incrementAndGet();

LOG.debug("Sync Iteration {} flushed transactions in this " +
"iteration{}", flushIterations.get(),
flushedTransactionsSize);
readyBuffer.clear();
// TODO: update the last updated index in OzoneManagerStateMachine.
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
if (isRunning) {
final String message = "OMDoubleBuffer flush thread " +
Thread.currentThread().getName() + " encountered Interrupted " +
"exception while running";
ExitUtils.terminate(1, message, ex, LOG);
} else {
LOG.info("OMDoubleBuffer flush thread " +
Thread.currentThread().getName() + " is interrupted and will " +
"exit. {}", Thread.currentThread().getName());
}
} catch (IOException ex) {
terminate(ex);
} catch (Throwable t) {
final String s = "OMDoubleBuffer flush thread" +
Thread.currentThread().getName() + "encountered Throwable error";
ExitUtils.terminate(2, s, t, LOG);
}
}
}

/**
* Stop OM DoubleBuffer flush thread.
*/
public synchronized void stop() {
if (isRunning) {
LOG.info("Stopping OMDoubleBuffer flush thread");
isRunning = false;
daemon.interrupt();
} else {
LOG.info("OMDoubleBuffer flush thread is not running.");
}

}

private void terminate(IOException ex) {
String message = "During flush to DB encountered error in " +
"OMDoubleBuffer flush thread " + Thread.currentThread().getName();
ExitUtils.terminate(1, message, ex, LOG);
}

/**
* Returns the flushed transaction count to OM DB.
* @return flushedTransactionCount
*/
public long getFlushedTransactionCount() {
return flushedTransactionCount.get();
}

/**
* Returns total number of flush iterations run by sync thread.
* @return flushIterations
*/
public long getFlushIterations() {
return flushIterations.get();
}

/**
* Add OmResponseBufferEntry to buffer.
* @param response
* @param transactionIndex
*/
public synchronized void add(OMClientResponse response,
long transactionIndex) {
currentBuffer.add(new DoubleBufferEntry<>(transactionIndex, response));
notify();
}

/**
* Check can we flush transactions or not. This method wait's until
* currentBuffer size is greater than zero, once currentBuffer size is
* greater than zero it gets notify signal, and it returns true
* indicating that we are ready to flush.
*
* @return boolean
*/
private synchronized boolean canFlush() throws InterruptedException {
// When transactions are added to buffer it notifies, then we check if
// currentBuffer size once and return from this method.
while (currentBuffer.size() == 0) {
wait(Long.MAX_VALUE);
}
return true;
}

/**
* Prepares the readyBuffer which is used by sync thread to flush
* transactions to OM DB. This method swaps the currentBuffer and readyBuffer.
*/
private synchronized void setReadyBuffer() {
Queue<DoubleBufferEntry<OMClientResponse>> temp = currentBuffer;
currentBuffer = readyBuffer;
readyBuffer = temp;
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.om.ratis.helpers;

import org.apache.hadoop.ozone.om.response.OMClientResponse;

/**
* Entry in OzoneManagerDouble Buffer.
* @param <Response>
*/
public class DoubleBufferEntry<Response extends OMClientResponse> {

private long trxLogIndex;
private Response response;

public DoubleBufferEntry(long trxLogIndex, Response response) {
this.trxLogIndex = trxLogIndex;
this.response = response;
}

public long getTrxLogIndex() {
return trxLogIndex;
}

public Response getResponse() {
return response;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
/**
* package which contains helper classes for each OM request response.
*/
package org.apache.hadoop.ozone.om.ratis.helpers;
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bharatviswa504 why is this change a part of the DoubleBuffer patch? This patch should just introduce the DoubleBuffer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added them for testing purpose to test DoubleBuffer Implementation.
Actual usage and the classes will be changed further according to needs of HA in further Jira.

In this PR #827 handled bucket related operations.

* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.om.response;

import java.io.IOException;

import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.utils.db.BatchOperation;

/**
* Response for CreateBucket request.
*/
public final class OMBucketCreateResponse implements OMClientResponse {

private final OmBucketInfo omBucketInfo;

public OMBucketCreateResponse(OmBucketInfo omBucketInfo) {
this.omBucketInfo = omBucketInfo;
}

@Override
public void addToDBBatch(OMMetadataManager omMetadataManager,
BatchOperation batchOperation) throws IOException {
String dbBucketKey =
omMetadataManager.getBucketKey(omBucketInfo.getVolumeName(),
omBucketInfo.getBucketName());
omMetadataManager.getBucketTable().putWithBatch(batchOperation, dbBucketKey,
omBucketInfo);
}

public OmBucketInfo getOmBucketInfo() {
return omBucketInfo;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. This should be moved to a subsequent patch right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added them for testing purpose to test DoubleBuffer Implementation.
Actual usage and the classes will be changed further according to needs of HA in further Jira.

In this PR #827 handled bucket related operations.

* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.om.response;

import java.io.IOException;

import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.utils.db.BatchOperation;

/**
* Response for DeleteBucket request.
*/
public final class OMBucketDeleteResponse implements OMClientResponse {

private String volumeName;
private String bucketName;

public OMBucketDeleteResponse(
String volumeName, String bucketName) {
this.volumeName = volumeName;
this.bucketName = bucketName;
}

@Override
public void addToDBBatch(OMMetadataManager omMetadataManager,
BatchOperation batchOperation) throws IOException {
String dbBucketKey =
omMetadataManager.getBucketKey(volumeName, bucketName);
omMetadataManager.getBucketTable().deleteWithBatch(batchOperation,
dbBucketKey);
}

public String getVolumeName() {
return volumeName;
}

public String getBucketName() {
return bucketName;
}
}

Loading