diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/PartialTableCache.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/PartialTableCache.java index 4d3711269a168..5850f62495781 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/PartialTableCache.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/PartialTableCache.java @@ -58,12 +58,16 @@ public PartialTableCache() { @Override public CACHEVALUE get(CACHEKEY cachekey) { - return cache.get(cachekey); + synchronized (cache) { + return cache.get(cachekey); + } } @Override public void put(CACHEKEY cacheKey, CACHEVALUE value) { - cache.put(cacheKey, value); + synchronized (cache) { + cache.put(cacheKey, value); + } epochEntries.add(new EpochEntry<>(value.getEpoch(), cacheKey)); } @@ -83,14 +87,16 @@ private void evictCache(long epoch) { iterator.hasNext();) { currentEntry = iterator.next(); CACHEKEY cachekey = currentEntry.getCachekey(); - CacheValue cacheValue = cache.get(cachekey); - if (cacheValue.getEpoch() <= epoch) { - cache.remove(cachekey); - iterator.remove(); - } else { - // If currentEntry epoch is greater than epoch, we have deleted all - // entries less than specified epoch. So, we can break. - break; + synchronized (cache) { + CacheValue cacheValue = cache.get(cachekey); + if (cacheValue.getEpoch() <= epoch) { + cache.remove(cachekey); + iterator.remove(); + } else { + // If currentEntry epoch is greater than epoch, we have deleted all + // entries less than specified epoch. So, we can break. + break; + } } } } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java index 2c4418cb9ec4b..a7573132b4f36 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java @@ -195,6 +195,8 @@ public enum ResultCodes { FILE_ALREADY_EXISTS, - NOT_A_FILE + NOT_A_FILE, + + PERMISSION_DENIED } } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java index eb514d0961317..ad2bc316f5e19 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java @@ -18,8 +18,6 @@ package org.apache.hadoop.ozone.om.protocol; -import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; -import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; @@ -168,54 +166,4 @@ void applySetOwner(String oldOwner, VolumeList oldOwnerVolumeList, */ void applyDeleteVolume(String volume, String owner, VolumeList newVolumeList) throws IOException; - - /** - * Start Create Bucket Transaction. - * @param omBucketInfo - * @return OmBucketInfo - * @throws IOException - */ - OmBucketInfo startCreateBucket(OmBucketInfo omBucketInfo) throws IOException; - - /** - * Apply Create Bucket Changes to OM DB. - * @param omBucketInfo - * @throws IOException - */ - void applyCreateBucket(OmBucketInfo omBucketInfo) throws IOException; - - /** - * Start Delete Bucket Transaction. - * @param volumeName - * @param bucketName - * @throws IOException - */ - void startDeleteBucket(String volumeName, String bucketName) - throws IOException; - - /** - * Apply Delete Bucket changes to OM DB. - * @param volumeName - * @param bucketName - * @throws IOException - */ - void applyDeleteBucket(String volumeName, String bucketName) - throws IOException; - - /** - * Start SetBucket Property Transaction. - * @param omBucketArgs - * @return OmBucketInfo - * @throws IOException - */ - OmBucketInfo startSetBucketProperty(OmBucketArgs omBucketArgs) - throws IOException; - - /** - * Apply SetBucket Property changes to OM DB. - * @param omBucketInfo - * @throws IOException - */ - void applySetBucketProperty(OmBucketInfo omBucketInfo) throws IOException; - } diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto index e82741bed5ec4..cc97a12a0672c 100644 --- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto +++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto @@ -97,6 +97,8 @@ message OMRequest { required string clientId = 3; + optional UgiInfo ugiInfo = 4; + optional CreateVolumeRequest createVolumeRequest = 11; optional SetVolumePropertyRequest setVolumePropertyRequest = 12; optional CheckVolumeAccessRequest checkVolumeAccessRequest = 13; @@ -258,8 +260,14 @@ enum Status { DIRECTORY_NOT_FOUND = 45; FILE_ALREADY_EXISTS = 46; NOT_A_FILE = 47; + + PERMISSION_DENIED = 48; } +message UgiInfo { + optional string userName = 1; + optional string remoteAddress = 3; +} message VolumeInfo { required string adminName = 1; @@ -370,7 +378,7 @@ message BucketInfo { repeated OzoneAclInfo acls = 3; required bool isVersionEnabled = 4 [default = false]; required StorageTypeProto storageType = 5 [default = DISK]; - required uint64 creationTime = 6; + optional uint64 creationTime = 6; repeated hadoop.hdds.KeyValue metadata = 7; optional BucketEncryptionInfoProto beinfo = 8; } @@ -490,11 +498,7 @@ message InfoBucketResponse { } message SetBucketPropertyRequest { - //TODO: See if we can merge bucketArgs and bucketInfo optional BucketArgs bucketArgs = 1; - // This will be set during startTransaction, and used to apply to OM DB - // during applyTransaction. - optional BucketInfo bucketInfo = 2; } message SetBucketPropertyResponse { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java index 39fada8959d14..5d739c2988a62 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java @@ -171,11 +171,11 @@ public void testBucketOps() throws IOException { Mockito.doNothing().when(mockS3Bm).deleteS3Bucket("random"); Mockito.doReturn(true).when(mockS3Bm).createOzoneVolumeIfNeeded(null); - Mockito.doReturn(null).when(mockBm).createBucket(null); - Mockito.doReturn(null).when(mockBm).createBucket(null); + Mockito.doNothing().when(mockBm).createBucket(null); + Mockito.doNothing().when(mockBm).createBucket(null); Mockito.doNothing().when(mockBm).deleteBucket(null, null); Mockito.doReturn(null).when(mockBm).getBucketInfo(null, null); - Mockito.doReturn(null).when(mockBm).setBucketProperty(null); + Mockito.doNothing().when(mockBm).setBucketProperty(null); Mockito.doReturn(null).when(mockBm).listBuckets(null, null, null, 0); HddsWhiteboxTestUtils.setInternalState( diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java index 422a02c0f4cc7..3c168b3241a07 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java @@ -405,7 +405,7 @@ private void createKeyTest(boolean checkSuccess) throws Exception { // last running OM as it would fail to get a quorum. if (e instanceof RemoteException) { GenericTestUtils.assertExceptionContains( - "RaftRetryFailureException", e); + "NotLeaderException", e); } } else { throw e; @@ -446,7 +446,7 @@ private void createVolumeTest(boolean checkSuccess) throws Exception { // last running OM as it would fail to get a quorum. if (e instanceof RemoteException) { GenericTestUtils.assertExceptionContains( - "RaftRetryFailureException", e); + "NotLeaderException", e); } } else { throw e; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManager.java index 460ac1157c34d..4417567d9b6db 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManager.java @@ -30,14 +30,7 @@ public interface BucketManager { * Creates a bucket. * @param bucketInfo - OmBucketInfo for creating bucket. */ - OmBucketInfo createBucket(OmBucketInfo bucketInfo) throws IOException; - - /** - * Apply Create Bucket changes to OM DB. - * @param omBucketInfo - * @throws IOException - */ - void applyCreateBucket(OmBucketInfo omBucketInfo) throws IOException; + void createBucket(OmBucketInfo bucketInfo) throws IOException; /** @@ -53,14 +46,7 @@ OmBucketInfo getBucketInfo(String volumeName, String bucketName) * @param args - BucketArgs. * @throws IOException */ - OmBucketInfo setBucketProperty(OmBucketArgs args) throws IOException; - - /** - * Apply SetBucket Property changes to OM DB. - * @param omBucketInfo - * @throws IOException - */ - void applySetBucketProperty(OmBucketInfo omBucketInfo) throws IOException; + void setBucketProperty(OmBucketArgs args) throws IOException; /** * Deletes an existing empty bucket from volume. @@ -70,15 +56,6 @@ OmBucketInfo getBucketInfo(String volumeName, String bucketName) */ void deleteBucket(String volumeName, String bucketName) throws IOException; - /** - * Apply Delete Bucket changes to OM DB. - * @param volumeName - * @param bucketName - * @throws IOException - */ - void applyDeleteBucket(String volumeName, String bucketName) - throws IOException; - /** * Returns a list of buckets represented by {@link OmBucketInfo} * in the given volume. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java index 68cd2a9dd6232..0de189cc9b1a9 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java @@ -102,7 +102,7 @@ KeyProviderCryptoExtension getKMSProvider() { * @param bucketInfo - OmBucketInfo. */ @Override - public OmBucketInfo createBucket(OmBucketInfo bucketInfo) throws IOException { + public void createBucket(OmBucketInfo bucketInfo) throws IOException { Preconditions.checkNotNull(bucketInfo); String volumeName = bucketInfo.getVolumeName(); String bucketName = bucketInfo.getBucketName(); @@ -165,11 +165,8 @@ public OmBucketInfo createBucket(OmBucketInfo bucketInfo) throws IOException { } OmBucketInfo omBucketInfo = omBucketInfoBuilder.build(); - if (!isRatisEnabled) { - commitCreateBucketInfoToDB(omBucketInfo); - } + commitCreateBucketInfoToDB(omBucketInfo); LOG.debug("created bucket: {} in volume: {}", bucketName, volumeName); - return omBucketInfo; } catch (IOException | DBException ex) { if (!(ex instanceof OMException)) { LOG.error("Bucket creation failed for bucket:{} in volume:{}", @@ -182,18 +179,6 @@ public OmBucketInfo createBucket(OmBucketInfo bucketInfo) throws IOException { } } - - public void applyCreateBucket(OmBucketInfo omBucketInfo) throws IOException { - Preconditions.checkNotNull(omBucketInfo); - try { - commitCreateBucketInfoToDB(omBucketInfo); - } catch (IOException ex) { - LOG.error("Apply CreateBucket Failed for bucket: {}, volume: {}", - omBucketInfo.getBucketName(), omBucketInfo.getVolumeName(), ex); - throw ex; - } - } - private void commitCreateBucketInfoToDB(OmBucketInfo omBucketInfo) throws IOException { String dbBucketKey = @@ -243,7 +228,7 @@ public OmBucketInfo getBucketInfo(String volumeName, String bucketName) * @throws IOException - On Failure. */ @Override - public OmBucketInfo setBucketProperty(OmBucketArgs args) throws IOException { + public void setBucketProperty(OmBucketArgs args) throws IOException { Preconditions.checkNotNull(args); String volumeName = args.getVolumeName(); String bucketName = args.getBucketName(); @@ -296,11 +281,7 @@ public OmBucketInfo setBucketProperty(OmBucketArgs args) throws IOException { bucketInfoBuilder.setCreationTime(oldBucketInfo.getCreationTime()); OmBucketInfo omBucketInfo = bucketInfoBuilder.build(); - - if (!isRatisEnabled) { - commitSetBucketPropertyInfoToDB(omBucketInfo); - } - return omBucketInfo; + commitSetBucketPropertyInfoToDB(omBucketInfo); } catch (IOException | DBException ex) { if (!(ex instanceof OMException)) { LOG.error("Setting bucket property failed for bucket:{} in volume:{}", @@ -312,18 +293,6 @@ public OmBucketInfo setBucketProperty(OmBucketArgs args) throws IOException { } } - public void applySetBucketProperty(OmBucketInfo omBucketInfo) - throws IOException { - try { - commitSetBucketPropertyInfoToDB(omBucketInfo); - } catch (IOException ex) { - LOG.error("Apply SetBucket property failed for bucket:{} in " + - "volume:{}", omBucketInfo.getBucketName(), - omBucketInfo.getVolumeName(), ex); - throw ex; - } - } - private void commitSetBucketPropertyInfoToDB(OmBucketInfo omBucketInfo) throws IOException { commitCreateBucketInfoToDB(omBucketInfo); @@ -377,10 +346,7 @@ public void deleteBucket(String volumeName, String bucketName) throw new OMException("Bucket is not empty", OMException.ResultCodes.BUCKET_NOT_EMPTY); } - - if (!isRatisEnabled) { - commitDeleteBucketInfoToOMDB(bucketKey); - } + commitDeleteBucketInfoToOMDB(bucketKey); } catch (IOException ex) { if (!(ex instanceof OMException)) { LOG.error("Delete bucket failed for bucket:{} in volume:{}", bucketName, @@ -392,20 +358,6 @@ public void deleteBucket(String volumeName, String bucketName) } } - public void applyDeleteBucket(String volumeName, String bucketName) - throws IOException { - Preconditions.checkNotNull(volumeName); - Preconditions.checkNotNull(bucketName); - try { - commitDeleteBucketInfoToOMDB(metadataManager.getBucketKey(volumeName, - bucketName)); - } catch (IOException ex) { - LOG.error("Apply DeleteBucket Failed for bucket: {}, volume: {}", - bucketName, volumeName, ex); - throw ex; - } - } - private void commitDeleteBucketInfoToOMDB(String dbBucketKey) throws IOException { metadataManager.getBucketTable().delete(dbBucketKey); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index ec51fe77c04d0..24eb8d2f71bef 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -24,6 +24,7 @@ import com.google.common.base.Preconditions; import com.google.protobuf.BlockingService; +import java.net.InetAddress; import java.security.PrivateKey; import java.security.PublicKey; import java.security.KeyPair; @@ -313,6 +314,7 @@ private OzoneManager(OzoneConfiguration conf) throws IOException, RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class, ProtobufRpcEngine.class); + metadataManager = new OmMetadataManagerImpl(configuration); startRatisServer(); startRatisClient(); @@ -325,7 +327,6 @@ private OzoneManager(OzoneConfiguration conf) throws IOException, secConfig = new SecurityConfig(configuration); - metadataManager = new OmMetadataManagerImpl(configuration); volumeManager = new VolumeManagerImpl(metadataManager, configuration); // Create the KMS Key Provider @@ -1270,7 +1271,7 @@ private RPC.Server getRpcServer(OzoneConfiguration conf) throws IOException { BlockingService omService = newReflectiveBlockingService( new OzoneManagerProtocolServerSideTranslatorPB(this, omRatisServer, - omRatisClient, isRatisEnabled)); + isRatisEnabled)); return startRpcServer(configuration, omNodeRpcAddr, OzoneManagerProtocolPB.class, omService, handlerCount); @@ -1709,67 +1710,6 @@ public void applyDeleteVolume(String volume, String owner, volumeManager.applyDeleteVolume(volume, owner, newVolumeList); } - - @Override - public OmBucketInfo startCreateBucket(OmBucketInfo omBucketInfo) - throws IOException { - Preconditions.checkNotNull(omBucketInfo); - if(isAclEnabled) { - checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.CREATE, - omBucketInfo.getVolumeName(), omBucketInfo.getBucketName(), null); - } - - return bucketManager.createBucket(omBucketInfo); - } - - @Override - public void applyCreateBucket(OmBucketInfo omBucketInfo) throws IOException { - // TODO: Need to add metrics and Audit log for HA requests - bucketManager.applyCreateBucket(omBucketInfo); - } - - - @Override - public void startDeleteBucket(String volumeName, String bucketName) - throws IOException { - // TODO: Need to add metrics and Audit log for HA requests - if(isAclEnabled) { - checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.CREATE, - volumeName, bucketName, null); - } - - bucketManager.deleteBucket(volumeName, bucketName); - } - - - @Override - public void applyDeleteBucket(String volumeName, String bucketName) - throws IOException { - // TODO: Need to add metrics and Audit log for HA requests - bucketManager.applyDeleteBucket(volumeName, bucketName); - } - - - @Override - public OmBucketInfo startSetBucketProperty(OmBucketArgs omBucketArgs) - throws IOException { - Preconditions.checkNotNull(omBucketArgs); - // TODO: Need to add metrics and Audit log for HA requests - if(isAclEnabled) { - checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.CREATE, - omBucketArgs.getVolumeName(), omBucketArgs.getBucketName(), null); - } - return bucketManager.setBucketProperty(omBucketArgs); - } - - - @Override - public void applySetBucketProperty(OmBucketInfo omBucketInfo) - throws IOException { - // TODO: Need to add metrics and Audit log for HA requests - bucketManager.applySetBucketProperty(omBucketInfo); - } - /** * Checks if current caller has acl permissions. * @@ -1784,32 +1724,59 @@ public void applySetBucketProperty(OmBucketInfo omBucketInfo) private void checkAcls(ResourceType resType, StoreType store, ACLType acl, String vol, String bucket, String key) throws OzoneAclException { - if(!isAclEnabled) { - return; - } + checkAcls(resType, store, acl, vol, bucket, key, + ProtobufRpcEngine.Server.getRemoteUser(), + ProtobufRpcEngine.Server.getRemoteIp()); + } + /** + * CheckAcls for the ozone object. + * @param resType + * @param storeType + * @param aclType + * @param vol + * @param bucket + * @param key + * @param ugi + * @param remoteAddress + * @throws OzoneAclException + */ + @SuppressWarnings("parameternumber") + public void checkAcls(ResourceType resType, StoreType storeType, + ACLType aclType, String vol, String bucket, String key, + UserGroupInformation ugi, InetAddress remoteAddress) + throws OzoneAclException { OzoneObj obj = OzoneObjInfo.Builder.newBuilder() .setResType(resType) - .setStoreType(store) + .setStoreType(storeType) .setVolumeName(vol) .setBucketName(bucket) .setKeyName(key).build(); - UserGroupInformation user = ProtobufRpcEngine.Server.getRemoteUser(); + RequestContext context = RequestContext.newBuilder() - .setClientUgi(user) - .setIp(ProtobufRpcEngine.Server.getRemoteIp()) + .setClientUgi(ugi) + .setIp(remoteAddress) .setAclType(ACLIdentityType.USER) - .setAclRights(acl) + .setAclRights(aclType) .build(); if (!accessAuthorizer.checkAccess(obj, context)) { LOG.warn("User {} doesn't have {} permission to access {}", - user.getUserName(), acl, resType); - throw new OzoneAclException("User " + user.getUserName() + " doesn't " + - "have " + acl + " permission to access " + resType, + ugi.getUserName(), aclType, resType); + throw new OzoneAclException("User " + ugi.getUserName() + " doesn't " + + "have " + aclType + " permission to access " + resType, ErrorCode.PERMISSION_DENIED); } } + /** + * + * Return true if Ozone acl's are enabled, else false. + * @return boolean + */ + public boolean getAclsEnabled() { + return isAclEnabled; + } + /** * Changes the owner of a volume. * @@ -2452,8 +2419,10 @@ public void setBucketProperty(OmBucketArgs args) */ @Override public void deleteBucket(String volume, String bucket) throws IOException { - checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.WRITE, volume, - bucket, null); + if (isAclEnabled) { + checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.WRITE, volume, + bucket, null); + } Map auditMap = buildAuditMap(volume); auditMap.put(OzoneConsts.BUCKET, bucket); try { @@ -3037,4 +3006,8 @@ public String getComponent() { public OMFailoverProxyProvider getOMFailoverProxyProvider() { return null; } + + public OMMetrics getOmMetrics() { + return metrics; + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManagerImpl.java index 87fc8cd99b040..c234266f887e2 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManagerImpl.java @@ -137,13 +137,7 @@ public void deleteS3Bucket(String bucketName) throws IOException { OMException.ResultCodes.S3_BUCKET_NOT_FOUND); } - if (isRatisEnabled) { - bucketManager.deleteBucket(getOzoneVolumeName(bucketName), bucketName); - bucketManager.applyDeleteBucket(getOzoneVolumeName(bucketName), - bucketName); - } else { - bucketManager.deleteBucket(getOzoneVolumeName(bucketName), bucketName); - } + bucketManager.deleteBucket(getOzoneVolumeName(bucketName), bucketName); omMetadataManager.getS3Table().delete(bucketName); } catch(IOException ex) { throw ex; @@ -202,11 +196,7 @@ private void createOzoneBucket(String volumeName, String bucketName) .setIsVersionEnabled(Boolean.FALSE) .setStorageType(StorageType.DEFAULT) .build(); - if (isRatisEnabled) { - bucketManager.applyCreateBucket(bucketManager.createBucket(bucketInfo)); - } else { - bucketManager.createBucket(bucketInfo); - } + bucketManager.createBucket(bucketInfo); } @Override diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java new file mode 100644 index 0000000000000..dc9a2e5b74d5c --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.ratis; + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.ratis.helpers.DoubleBufferEntry; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.utils.db.BatchOperation; + +import org.apache.ratis.util.ExitUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class implements DoubleBuffer implementation of OMClientResponse's. In + * DoubleBuffer it has 2 buffers one is currentBuffer and other is + * readyBuffer. The current OM requests will be always added to currentBuffer. + * Flush thread will be running in background, it check's if currentBuffer has + * any entries, it swaps the buffer and creates a batch and commit to DB. + * Adding OM request to doubleBuffer and swap of buffer are synchronized + * methods. + * + */ +public class OzoneManagerDoubleBuffer { + + private static final Logger LOG = + LoggerFactory.getLogger(OzoneManagerDoubleBuffer.class); + + // Taken unbounded queue, if sync thread is taking too long time, we + // might end up taking huge memory to add entries to the buffer. + // TODO: We can avoid this using unbounded queue and use queue with + // capacity, if queue is full we can wait for sync to be completed to + // add entries. But in this also we might block rpc handlers, as we + // clear entries after sync. Or we can come up with a good approach to + // solve this. + private Queue> currentBuffer; + private Queue> readyBuffer; + + private Daemon daemon; + private final OMMetadataManager omMetadataManager; + private final AtomicLong flushedTransactionCount = new AtomicLong(0); + private final AtomicLong flushIterations = new AtomicLong(0); + private volatile boolean isRunning; + private OzoneManagerRatisSnapshot ratisSnapshot; + + + public OzoneManagerDoubleBuffer(OMMetadataManager omMetadataManager, + OzoneManagerRatisSnapshot ratisSnapshot) { + this.currentBuffer = new ConcurrentLinkedQueue<>(); + this.readyBuffer = new ConcurrentLinkedQueue<>(); + this.omMetadataManager = omMetadataManager; + this.ratisSnapshot = ratisSnapshot; + + isRunning = true; + // Daemon thread which runs in back ground and flushes transactions to DB. + daemon = new Daemon(this::flushTransactions); + daemon.setName("OMDoubleBufferFlushThread"); + daemon.start(); + + } + + /** + * Runs in a background thread and batches the transaction in currentBuffer + * and commit to DB. + */ + private void flushTransactions() { + while(isRunning) { + try { + if (canFlush()) { + setReadyBuffer(); + final BatchOperation batchOperation = omMetadataManager.getStore() + .initBatchOperation(); + + readyBuffer.iterator().forEachRemaining((entry) -> { + try { + entry.getResponse().addToDBBatch(omMetadataManager, + batchOperation); + } catch (IOException ex) { + // During Adding to RocksDB batch entry got an exception. + // We should terminate the OM. + terminate(ex); + } + }); + + omMetadataManager.getStore().commitBatchOperation(batchOperation); + int flushedTransactionsSize = readyBuffer.size(); + flushedTransactionCount.addAndGet(flushedTransactionsSize); + flushIterations.incrementAndGet(); + + LOG.debug("Sync Iteration {} flushed transactions in this " + + "iteration{}", flushIterations.get(), + flushedTransactionsSize); + + long lastUpdatedIndex = + readyBuffer.stream().map(DoubleBufferEntry::getTrxLogIndex) + .max(Long::compareTo).get(); + readyBuffer.clear(); + + // Update last update index once after buffer flush, so when OM + // restarts we don't apply all the transactions which are + // already flushed. As ratis take's snapshot periodically after + // 400k transactions(current default value). + try { + ratisSnapshot.takeSnapshot(lastUpdatedIndex); + } catch (IOException ex) { + // We don't need to terminate OM when taking snapshot fails. or + // Do we need to terminate? + LOG.error("Error occurred during take Snapshot", ex); + } + + //TODO: clear cache + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + if (isRunning) { + final String message = "OMDoubleBuffer flush thread " + + Thread.currentThread().getName() + " encountered Interrupted " + + "exception while running"; + ExitUtils.terminate(1, message, ex, LOG); + } else { + LOG.info("OMDoubleBuffer flush thread " + + Thread.currentThread().getName() + " is interrupted and will " + + "exit. {}", Thread.currentThread().getName()); + } + } catch (IOException ex) { + terminate(ex); + } catch (Throwable t) { + final String s = "OMDoubleBuffer flush thread" + + Thread.currentThread().getName() + "encountered Throwable error"; + ExitUtils.terminate(2, s, t, LOG); + } + } + } + + /** + * Stop OM DoubleBuffer flush thread. + */ + public synchronized void stop() { + if (isRunning) { + LOG.info("Stopping OMDoubleBuffer flush thread"); + isRunning = false; + daemon.interrupt(); + } else { + LOG.info("OMDoubleBuffer flush thread is not running."); + } + + } + + private void terminate(IOException ex) { + String message = "During flush to DB encountered error in " + + "OMDoubleBuffer flush thread " + Thread.currentThread().getName(); + ExitUtils.terminate(1, message, ex, LOG); + } + + /** + * Returns the flushed transaction count to OM DB. + * @return flushedTransactionCount + */ + public long getFlushedTransactionCount() { + return flushedTransactionCount.get(); + } + + /** + * Returns total number of flush iterations run by sync thread. + * @return flushIterations + */ + public long getFlushIterations() { + return flushIterations.get(); + } + + /** + * Add OmResponseBufferEntry to buffer. + * @param response + * @param transactionIndex + */ + public synchronized void add(OMClientResponse response, + long transactionIndex) { + currentBuffer.add(new DoubleBufferEntry<>(transactionIndex, response)); + notify(); + } + + /** + * Check can we flush transactions or not. This method wait's until + * currentBuffer size is greater than zero, once currentBuffer size is + * greater than zero it gets notify signal, and it returns true + * indicating that we are ready to flush. + * + * @return boolean + */ + private synchronized boolean canFlush() throws InterruptedException { + // When transactions are added to buffer it notifies, then we check if + // currentBuffer size once and return from this method. + while (currentBuffer.size() == 0) { + wait(Long.MAX_VALUE); + } + return true; + } + + /** + * Prepares the readyBuffer which is used by sync thread to flush + * transactions to OM DB. This method swaps the currentBuffer and readyBuffer. + */ + private synchronized void setReadyBuffer() { + Queue> temp = currentBuffer; + currentBuffer = readyBuffer; + readyBuffer = temp; + } + +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java index b16f9f23ca313..2f1855368badf 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java @@ -35,12 +35,20 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.ServiceException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.scm.HddsServerUtil; import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OMNodeDetails; -import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMResponse; import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.conf.RaftProperties; @@ -51,10 +59,15 @@ import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.GroupInfoReply; import org.apache.ratis.protocol.GroupInfoRequest; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.NotLeaderException; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.StateMachineException; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.RaftServer; @@ -66,6 +79,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.ozone.om.exceptions.OMException.STATUS_CODE; + /** * Creates a Ratis server endpoint for OM. */ @@ -80,7 +95,7 @@ public final class OzoneManagerRatisServer { private final RaftGroup raftGroup; private final RaftPeerId raftPeerId; - private final OzoneManagerServerProtocol ozoneManager; + private final OzoneManager ozoneManager; private final OzoneManagerStateMachine omStateMachine; private final ClientId clientId = ClientId.randomId(); @@ -97,6 +112,100 @@ private static long nextCallId() { return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE; } + /** + * Submit request to Ratis server. + * @param omRequest + * @return OMResponse - response returned to the client. + * @throws ServiceException + */ + public OMResponse submitRequest(OMRequest omRequest) throws ServiceException { + RaftClientRequest raftClientRequest = + createWriteRaftClientRequest(omRequest); + RaftClientReply raftClientReply; + try { + raftClientReply = server.submitClientRequestAsync(raftClientRequest) + .get(); + } catch (Exception ex) { + throw new ServiceException(ex.getMessage(), ex); + } + + return processReply(omRequest, raftClientReply); + } + + /** + * Create Write RaftClient request from OMRequest. + * @param omRequest + * @return + */ + private RaftClientRequest createWriteRaftClientRequest(OMRequest omRequest) { + return new RaftClientRequest(clientId, server.getId(), raftGroupId, + nextCallId(), + Message.valueOf(OMRatisHelper.convertRequestToByteString(omRequest)), + RaftClientRequest.writeRequestType(), null); + } + + /** + * Process the raftClientReply and return OMResponse. + * @param omRequest + * @param reply + * @return + * @throws ServiceException + */ + private OMResponse processReply(OMRequest omRequest, RaftClientReply reply) + throws ServiceException { + // NotLeader exception is thrown only when the raft server to which the + // request is submitted is not the leader. This can happen first time + // when client is submitting request to OM. + NotLeaderException notLeaderException = reply.getNotLeaderException(); + if (notLeaderException != null) { + throw new ServiceException(notLeaderException); + } + StateMachineException stateMachineException = + reply.getStateMachineException(); + if (stateMachineException != null) { + OMResponse.Builder omResponse = OMResponse.newBuilder(); + omResponse.setCmdType(omRequest.getCmdType()); + omResponse.setSuccess(false); + omResponse.setMessage(stateMachineException.getCause().getMessage()); + omResponse.setStatus(parseErrorStatus( + stateMachineException.getCause().getMessage())); + return omResponse.build(); + } + + try { + return OMRatisHelper.getOMResponseFromRaftClientReply(reply); + } catch (InvalidProtocolBufferException ex) { + if (ex.getMessage() != null) { + throw new ServiceException(ex.getMessage(), ex); + } else { + throw new ServiceException(ex); + } + } + + // TODO: Still need to handle RaftRetry failure exception. + } + + /** + * Parse errorMessage received from the exception and convert to + * {@link OzoneManagerProtocolProtos.Status}. + * @param errorMessage + * @return + */ + private OzoneManagerProtocolProtos.Status parseErrorStatus( + String errorMessage) { + if (errorMessage.contains(STATUS_CODE)) { + String errorCode = errorMessage.substring( + errorMessage.indexOf(STATUS_CODE) + STATUS_CODE.length()); + LOG.debug("Parsing error message for error code " + + errorCode); + return OzoneManagerProtocolProtos.Status.valueOf(errorCode.trim()); + } else { + return OzoneManagerProtocolProtos.Status.INTERNAL_ERROR; + } + + } + + /** * Returns an OM Ratis server. * @param conf configuration @@ -108,7 +217,7 @@ private static long nextCallId() { * @throws IOException */ private OzoneManagerRatisServer(Configuration conf, - OzoneManagerServerProtocol om, + OzoneManager om, String raftGroupIdStr, RaftPeerId localRaftPeerId, InetSocketAddress addr, List raftPeers) throws IOException { @@ -157,7 +266,7 @@ public void run() { * Creates an instance of OzoneManagerRatisServer. */ public static OzoneManagerRatisServer newOMRatisServer( - Configuration ozoneConf, OzoneManagerServerProtocol omProtocol, + Configuration ozoneConf, OzoneManager omProtocol, OMNodeDetails omNodeDetails, List peerNodes) throws IOException { @@ -202,7 +311,7 @@ private OzoneManagerStateMachine getStateMachine() { return new OzoneManagerStateMachine(this); } - public OzoneManagerServerProtocol getOzoneManager() { + public OzoneManager getOzoneManager() { return ozoneManager; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisSnapshot.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisSnapshot.java new file mode 100644 index 0000000000000..9dfdbdb12aa50 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisSnapshot.java @@ -0,0 +1,11 @@ +package org.apache.hadoop.ozone.om.ratis; + +import java.io.IOException; + +/** + * Functional interface for OM RatisSnapshot. + */ +public interface OzoneManagerRatisSnapshot { + + long takeSnapshot(long index) throws IOException; +} \ No newline at end of file diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java index 02b8b88e10800..085b4d997b638 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java @@ -29,7 +29,6 @@ import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; -import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .MultipartInfoApplyInitiateRequest; @@ -67,15 +66,20 @@ public class OzoneManagerStateMachine extends BaseStateMachine { private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); private final OzoneManagerRatisServer omRatisServer; - private final OzoneManagerServerProtocol ozoneManager; + private final OzoneManager ozoneManager; private OzoneManagerHARequestHandler handler; private RaftGroupId raftGroupId; private long lastAppliedIndex = 0; + private final OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer; public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) { this.omRatisServer = ratisServer; this.ozoneManager = omRatisServer.getOzoneManager(); - this.handler = new OzoneManagerHARequestHandlerImpl(ozoneManager); + this.ozoneManagerDoubleBuffer = + new OzoneManagerDoubleBuffer(ozoneManager.getMetadataManager(), + this::takeSnapshot); + this.handler = new OzoneManagerHARequestHandlerImpl(ozoneManager, + ozoneManagerDoubleBuffer); } /** @@ -168,6 +172,11 @@ public long takeSnapshot() throws IOException { return 0; } + public long takeSnapshot(long appliedIndex) throws IOException { + this.lastAppliedIndex = appliedIndex; + return takeSnapshot(); + } + /** * Notifies the state machine that the raft peer is no longer leader. */ @@ -192,9 +201,6 @@ private TransactionContext handleStartTransactionRequests( case CreateVolume: case SetVolumeProperty: case DeleteVolume: - case CreateBucket: - case SetBucketProperty: - case DeleteBucket: newOmRequest = handler.handleStartTransaction(omRequest); break; case AllocateBlock: @@ -403,7 +409,7 @@ private IOException constructExceptionForFailedRequest( * @throws ServiceException */ private Message runCommand(OMRequest request, long trxLogIndex) { - OMResponse response = handler.handleApplyTransaction(request); + OMResponse response = handler.handleApplyTransaction(request, trxLogIndex); lastAppliedIndex = trxLogIndex; return OMRatisHelper.convertResponseToMessage(response); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/helpers/DoubleBufferEntry.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/helpers/DoubleBufferEntry.java new file mode 100644 index 0000000000000..cd4c5ae8b25e5 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/helpers/DoubleBufferEntry.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.ratis.helpers; + +import org.apache.hadoop.ozone.om.response.OMClientResponse; + +/** + * Entry in OzoneManagerDouble Buffer. + * @param + */ +public class DoubleBufferEntry { + + private long trxLogIndex; + private Response response; + + public DoubleBufferEntry(long trxLogIndex, Response response) { + this.trxLogIndex = trxLogIndex; + this.response = response; + } + + public long getTrxLogIndex() { + return trxLogIndex; + } + + public Response getResponse() { + return response; + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/helpers/package-info.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/helpers/package-info.java new file mode 100644 index 0000000000000..b12a324d681c0 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/helpers/package-info.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +/** + * package which contains helper classes for each OM request response. + */ +package org.apache.hadoop.ozone.om.ratis.helpers; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java new file mode 100644 index 0000000000000..1b2f3abff29aa --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.om.ratis.utils; + +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.request.OMBucketCreateRequest; +import org.apache.hadoop.ozone.om.request.OMBucketDeleteRequest; +import org.apache.hadoop.ozone.om.request.OMBucketSetPropertyRequest; +import org.apache.hadoop.ozone.om.request.OMClientRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; +import org.apache.hadoop.ozone.security.acl.OzoneAclException; + +import java.io.IOException; + +/** + * Utility class used by OzoneManager HA. + */ +public final class OzoneManagerRatisUtils { + + private OzoneManagerRatisUtils() { + } + /** + * Create OMClientRequest which enacpsulates the OMRequest. + * @param omRequest + * @return OMClientRequest + * @throws IOException + */ + public static OMClientRequest createClientRequest(OMRequest omRequest) + throws IOException { + Type cmdType = omRequest.getCmdType(); + switch (cmdType) { + case CreateBucket: + return new OMBucketCreateRequest(omRequest); + case DeleteBucket: + return new OMBucketDeleteRequest(omRequest); + case SetBucketProperty: + return new OMBucketSetPropertyRequest(omRequest); + default: + // TODO: will update once all request types are implemented. + return null; + } + } + + /** + * Convert exception result to OzoneManagerProtocolProtos.Status. + * @param exception + * @return OzoneManagerProtocolProtos.Status + */ + public static Status exceptionToResponseStatus(IOException exception) { + if (exception instanceof OMException | + exception instanceof OzoneAclException) { + return Status.values()[((OMException) exception).getResult().ordinal()]; + } else { + return Status.INTERNAL_ERROR; + } + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/package-info.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/package-info.java new file mode 100644 index 0000000000000..94fd0c89565fb --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.om.ratis.utils; + +/** + * Utility class used by OzoneManager HA. + */ diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMBucketCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMBucketCreateRequest.java new file mode 100644 index 0000000000000..e3ca3cd61d6c5 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMBucketCreateRequest.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.request; + +import java.io.IOException; + +import com.google.common.base.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.crypto.CipherSuite; +import org.apache.hadoop.crypto.key.KeyProvider; +import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OMMetrics; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.response.OMBucketCreateResponse; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .BucketEncryptionInfoProto; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .CreateBucketRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .CreateBucketResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .BucketInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMResponse; +import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer; +import org.apache.hadoop.ozone.security.acl.OzoneObj; +import org.apache.hadoop.ozone.protocolPB.OMPBHelper; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.utils.db.cache.CacheKey; +import org.apache.hadoop.utils.db.cache.CacheValue; + + + +import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .CryptoProtocolVersionProto.ENCRYPTION_ZONES; + +/** + * Handles CreateBucket Request. + */ +public class OMBucketCreateRequest extends OMClientRequest { + private static final Logger LOG = + LoggerFactory.getLogger(OMBucketCreateRequest.class); + + public OMBucketCreateRequest(OMRequest omRequest) { + super(omRequest); + } + @Override + public OMRequest preExecute(OzoneManager ozoneManager) throws IOException { + + // Get original request. + CreateBucketRequest createBucketRequest = + getOmRequest().getCreateBucketRequest(); + BucketInfo bucketInfo = createBucketRequest.getBucketInfo(); + + // Get KMS provider. + KeyProviderCryptoExtension kmsProvider = + ozoneManager.getKmsProvider(); + + // Create new Bucket request with new bucket info. + CreateBucketRequest.Builder newCreateBucketRequest = + createBucketRequest.toBuilder(); + + BucketInfo.Builder newBucketInfo = bucketInfo.toBuilder(); + + newCreateBucketRequest.setBucketInfo( + newBucketInfo.setCreationTime(Time.now())); + + if (bucketInfo.hasBeinfo()) { + newBucketInfo.setBeinfo(getBeinfo(kmsProvider, bucketInfo)); + } + + newCreateBucketRequest.setBucketInfo(newBucketInfo.build()); + return getOmRequest().toBuilder().setUgiInfo(getUgiInfo()) + .setCreateBucketRequest(newCreateBucketRequest.build()).build(); + } + + @Override + public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, + long transactionLogIndex) { + OMMetrics omMetrics = ozoneManager.getMetrics(); + omMetrics.incNumBucketCreates(); + + OMMetadataManager metadataManager = ozoneManager.getMetadataManager(); + + BucketInfo bucketInfo = getBucketInfoFromRequest(); + + String volumeName = bucketInfo.getVolumeName(); + String bucketName = bucketInfo.getBucketName(); + + OMResponse.Builder omResponse = OMResponse.newBuilder().setCmdType( + OzoneManagerProtocolProtos.Type.CreateBucket).setStatus( + OzoneManagerProtocolProtos.Status.OK); + OmBucketInfo omBucketInfo = null; + + try { + + // check Acl + if (ozoneManager.getAclsEnabled()) { + checkAcls(ozoneManager, OzoneObj.ResourceType.BUCKET, + OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.CREATE, + volumeName, bucketName, null); + } + + // acquire locks + metadataManager.getLock().acquireVolumeLock(volumeName); + metadataManager.getLock().acquireBucketLock(volumeName, bucketName); + + String volumeKey = metadataManager.getVolumeKey(volumeName); + String bucketKey = metadataManager.getBucketKey(volumeName, bucketName); + + //Check if the volume exists + if (metadataManager.getVolumeTable().get(volumeKey) == null) { + LOG.debug("volume: {} not found ", volumeName); + throw new OMException("Volume doesn't exist", + OMException.ResultCodes.VOLUME_NOT_FOUND); + } + //Check if bucket already exists + if (metadataManager.getBucketTable().get(bucketKey) != null) { + LOG.debug("bucket: {} already exists ", bucketName); + throw new OMException("Bucket already exist", + OMException.ResultCodes.BUCKET_ALREADY_EXISTS); + } + + omBucketInfo = OmBucketInfo.getFromProtobuf(bucketInfo); + LOG.debug("created bucket: {} in volume: {}", bucketName, volumeName); + omMetrics.incNumBuckets(); + + // Update table cache. + metadataManager.getBucketTable().addCacheEntry(new CacheKey<>(bucketKey), + new CacheValue<>(Optional.of(omBucketInfo), transactionLogIndex)); + + } catch (IOException ex) { + omMetrics.incNumBucketCreateFails(); + LOG.error("Bucket creation failed for bucket:{} in volume:{}", + bucketName, volumeName, ex); + omResponse.setStatus( + OzoneManagerRatisUtils.exceptionToResponseStatus(ex)); + omResponse.setMessage(ex.getMessage()); + omResponse.setSuccess(false); + } finally { + metadataManager.getLock().releaseBucketLock(volumeName, bucketName); + metadataManager.getLock().releaseVolumeLock(volumeName); + } + omResponse.setCreateBucketResponse( + CreateBucketResponse.newBuilder().build()); + return new OMBucketCreateResponse(omBucketInfo, omResponse.build()); + } + + + private BucketInfo getBucketInfoFromRequest() { + CreateBucketRequest createBucketRequest = + getOmRequest().getCreateBucketRequest(); + return createBucketRequest.getBucketInfo(); + } + + private BucketEncryptionInfoProto getBeinfo( + KeyProviderCryptoExtension kmsProvider, BucketInfo bucketInfo) + throws IOException { + BucketEncryptionInfoProto bek = bucketInfo.getBeinfo(); + BucketEncryptionInfoProto.Builder bekb = null; + if (kmsProvider == null) { + throw new OMException("Invalid KMS provider, check configuration " + + CommonConfigurationKeys.HADOOP_SECURITY_KEY_PROVIDER_PATH, + OMException.ResultCodes.INVALID_KMS_PROVIDER); + } + if (bek.getKeyName() == null) { + throw new OMException("Bucket encryption key needed.", OMException + .ResultCodes.BUCKET_ENCRYPTION_KEY_NOT_FOUND); + } + // Talk to KMS to retrieve the bucket encryption key info. + KeyProvider.Metadata metadata = kmsProvider.getMetadata( + bek.getKeyName()); + if (metadata == null) { + throw new OMException("Bucket encryption key " + bek.getKeyName() + + " doesn't exist.", + OMException.ResultCodes.BUCKET_ENCRYPTION_KEY_NOT_FOUND); + } + // If the provider supports pool for EDEKs, this will fill in the pool + kmsProvider.warmUpEncryptedKeys(bek.getKeyName()); + bekb = BucketEncryptionInfoProto.newBuilder() + .setKeyName(bek.getKeyName()) + .setCryptoProtocolVersion(ENCRYPTION_ZONES) + .setSuite(OMPBHelper.convert( + CipherSuite.convert(metadata.getCipher()))); + return bekb.build(); + } +} \ No newline at end of file diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMBucketDeleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMBucketDeleteRequest.java new file mode 100644 index 0000000000000..9cd9a2c2db98b --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMBucketDeleteRequest.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.request; + +import java.io.IOException; + +import com.google.common.base.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OMMetrics; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.response.OMBucketDeleteResponse; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMResponse; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; +import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer; +import org.apache.hadoop.ozone.security.acl.OzoneObj; +import org.apache.hadoop.utils.db.cache.CacheKey; +import org.apache.hadoop.utils.db.cache.CacheValue; + +/** + * Handles DeleteBucket Request. + */ +public class OMBucketDeleteRequest extends OMClientRequest { + private static final Logger LOG = + LoggerFactory.getLogger(OMBucketDeleteRequest.class); + + public OMBucketDeleteRequest(OMRequest omRequest) { + super(omRequest); + } + + @Override + public OMRequest preExecute(OzoneManager ozoneManager) throws IOException { + return getOmRequest().toBuilder().setUgiInfo(getUgiInfo()).build(); + } + + @Override + public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, + long transactionLogIndex) { + OMMetrics omMetrics = ozoneManager.getMetrics(); + omMetrics.incNumBucketDeletes(); + OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager(); + + OMRequest omRequest = getOmRequest(); + + String volumeName = omRequest.getDeleteBucketRequest().getVolumeName(); + String bucketName = omRequest.getDeleteBucketRequest().getBucketName(); + + // Generate end user response + OMResponse.Builder omResponse = OMResponse.newBuilder() + .setStatus(OzoneManagerProtocolProtos.Status.OK) + .setCmdType(omRequest.getCmdType()); + + try { + + // check Acl + if (ozoneManager.getAclsEnabled()) { + checkAcls(ozoneManager, OzoneObj.ResourceType.BUCKET, + OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE, + volumeName, bucketName, null); + } + + // acquire lock + omMetadataManager.getLock().acquireBucketLock(volumeName, bucketName); + + // No need to check volume exists here, as bucket cannot be created + // with out volume creation. + //Check if bucket exists + String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName); + OmBucketInfo omBucketInfo = + omMetadataManager.getBucketTable().get(bucketKey); + if (omBucketInfo == null) { + LOG.debug("bucket: {} not found ", bucketName); + throw new OMException("Bucket doesn't exist", + OMException.ResultCodes.BUCKET_NOT_FOUND); + } + //Check if bucket is empty + if (!omMetadataManager.isBucketEmpty(volumeName, bucketName)) { + LOG.debug("bucket: {} is not empty ", bucketName); + throw new OMException("Bucket is not empty", + OMException.ResultCodes.BUCKET_NOT_EMPTY); + } + omMetrics.decNumBuckets(); + + // Update table cache. + omMetadataManager.getBucketTable().addCacheEntry( + new CacheKey<>(bucketKey), + new CacheValue<>(Optional.absent(), transactionLogIndex)); + } catch (IOException ex) { + omMetrics.incNumBucketDeleteFails(); + LOG.error("Delete bucket failed for bucket:{} in volume:{}", bucketName, + volumeName, ex); + omResponse.setSuccess(false).setMessage(ex.getMessage()) + .setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(ex)); + } finally { + omMetadataManager.getLock().releaseBucketLock(volumeName, bucketName); + } + return new OMBucketDeleteResponse(volumeName, bucketName, + omResponse.build()); + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMBucketSetPropertyRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMBucketSetPropertyRequest.java new file mode 100644 index 0000000000000..f9731e466a3fd --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMBucketSetPropertyRequest.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.request; + +import java.io.IOException; +import java.util.List; + +import com.google.common.base.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import org.apache.hadoop.hdds.protocol.StorageType; +import org.apache.hadoop.ozone.OzoneAcl; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OMMetrics; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.helpers.KeyValueUtil; +import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.response.OMBucketSetPropertyResponse; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .BucketArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMResponse; +import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer; +import org.apache.hadoop.ozone.security.acl.OzoneObj; + +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; +import org.apache.hadoop.utils.db.cache.CacheKey; +import org.apache.hadoop.utils.db.cache.CacheValue; + +/** + * Handle SetBucketProperty Request. + */ +public class OMBucketSetPropertyRequest extends OMClientRequest { + private static final Logger LOG = + LoggerFactory.getLogger(OMBucketSetPropertyRequest.class); + + public OMBucketSetPropertyRequest(OMRequest omRequest) { + super(omRequest); + } + @Override + public OMRequest preExecute(OzoneManager ozoneManager) throws IOException { + return getOmRequest().toBuilder().setUgiInfo(getUgiInfo()).build(); + } + + @Override + public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, + long transactionLogIndex) { + + OMMetrics omMetrics = ozoneManager.getOmMetrics(); + + if (omMetrics != null) { + omMetrics.incNumBucketUpdates(); + } + + OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager(); + + BucketArgs bucketArgs = + getOmRequest().getSetBucketPropertyRequest().getBucketArgs(); + OmBucketArgs omBucketArgs = OmBucketArgs.getFromProtobuf(bucketArgs); + + String volumeName = bucketArgs.getVolumeName(); + String bucketName = bucketArgs.getBucketName(); + + OMResponse.Builder omResponse = OMResponse.newBuilder().setCmdType( + OzoneManagerProtocolProtos.Type.CreateBucket).setStatus( + OzoneManagerProtocolProtos.Status.OK); + OmBucketInfo omBucketInfo = null; + + try { + // check Acl + if (ozoneManager.getAclsEnabled()) { + checkAcls(ozoneManager, OzoneObj.ResourceType.BUCKET, + OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE, + volumeName, bucketName, null); + } + + // acquire lock + omMetadataManager.getLock().acquireBucketLock(volumeName, bucketName); + + String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName); + OmBucketInfo oldBucketInfo = + omMetadataManager.getBucketTable().get(bucketKey); + //Check if bucket exist + if (oldBucketInfo == null) { + LOG.debug("bucket: {} not found ", bucketName); + throw new OMException("Bucket doesn't exist", + OMException.ResultCodes.BUCKET_NOT_FOUND); + } + OmBucketInfo.Builder bucketInfoBuilder = OmBucketInfo.newBuilder(); + bucketInfoBuilder.setVolumeName(oldBucketInfo.getVolumeName()) + .setBucketName(oldBucketInfo.getBucketName()); + bucketInfoBuilder.addAllMetadata(KeyValueUtil + .getFromProtobuf(bucketArgs.getMetadataList())); + + //Check ACLs to update + if (omBucketArgs.getAddAcls() != null || + omBucketArgs.getRemoveAcls() != null) { + bucketInfoBuilder.setAcls(getUpdatedAclList(oldBucketInfo.getAcls(), + omBucketArgs.getRemoveAcls(), omBucketArgs.getAddAcls())); + LOG.debug("Updating ACLs for bucket: {} in volume: {}", + bucketName, volumeName); + } else { + bucketInfoBuilder.setAcls(oldBucketInfo.getAcls()); + } + + //Check StorageType to update + StorageType storageType = omBucketArgs.getStorageType(); + if (storageType != null) { + bucketInfoBuilder.setStorageType(storageType); + LOG.debug("Updating bucket storage type for bucket: {} in volume: {}", + bucketName, volumeName); + } else { + bucketInfoBuilder.setStorageType(oldBucketInfo.getStorageType()); + } + + //Check Versioning to update + Boolean versioning = omBucketArgs.getIsVersionEnabled(); + if (versioning != null) { + bucketInfoBuilder.setIsVersionEnabled(versioning); + LOG.debug("Updating bucket versioning for bucket: {} in volume: {}", + bucketName, volumeName); + } else { + bucketInfoBuilder + .setIsVersionEnabled(oldBucketInfo.getIsVersionEnabled()); + } + bucketInfoBuilder.setCreationTime(oldBucketInfo.getCreationTime()); + + omBucketInfo = bucketInfoBuilder.build(); + + // Update table cache. + omMetadataManager.getBucketTable().addCacheEntry( + new CacheKey<>(bucketKey), + new CacheValue<>(Optional.of(omBucketInfo), transactionLogIndex)); + } catch (IOException ex) { + if (omMetrics != null) { + omMetrics.incNumBucketUpdateFails(); + } + LOG.error("Setting bucket property failed for bucket:{} in volume:{}", + bucketName, volumeName, ex); + omResponse.setSuccess(false).setMessage(ex.getMessage()) + .setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(ex)); + } finally { + omMetadataManager.getLock().releaseBucketLock(volumeName, bucketName); + } + return new OMBucketSetPropertyResponse(omBucketInfo, omResponse.build()); + } + + /** + * Updates the existing ACL list with remove and add ACLs that are passed. + * Remove is done before Add. + * + * @param existingAcls - old ACL list. + * @param removeAcls - ACLs to be removed. + * @param addAcls - ACLs to be added. + * @return updated ACL list. + */ + private List< OzoneAcl > getUpdatedAclList(List existingAcls, + List removeAcls, List addAcls) { + if (removeAcls != null && !removeAcls.isEmpty()) { + existingAcls.removeAll(removeAcls); + } + if (addAcls != null && !addAcls.isEmpty()) { + addAcls.stream().filter(acl -> !existingAcls.contains(acl)).forEach( + existingAcls::add); + } + return existingAcls; + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java new file mode 100644 index 0000000000000..bacc8abb0902d --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.request; + +import java.io.IOException; +import java.net.InetAddress; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMRequest; +import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer; +import org.apache.hadoop.ozone.security.acl.OzoneObj; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * OMClientRequest provides methods which every write OM request should + * implement. + */ +public abstract class OMClientRequest { + + private OMRequest omRequest; + + public OMClientRequest(OMRequest omRequest) { + Preconditions.checkNotNull(omRequest); + this.omRequest = omRequest; + } + /** + * Perform pre-execute steps on a OMRequest. + * + * Called from the RPC context, and generates a OMRequest object which has + * all the information that will be either persisted + * in RocksDB or returned to the caller once this operation + * is executed. + * + * @return OMRequest that will be serialized and handed off to Ratis for + * consensus. + */ + public abstract OMRequest preExecute(OzoneManager ozoneManager) + throws IOException; + + /** + * Validate the OMRequest and update the cache. + * This step should verify that the request can be executed, perform + * any authorization steps and update the in-memory cache. + + * This step does not persist the changes to the database. + * + * @return the response that will be returned to the client. + */ + public abstract OMClientResponse validateAndUpdateCache( + OzoneManager ozoneManager, long transactionLogIndex); + + @VisibleForTesting + public OMRequest getOmRequest() { + return omRequest; + } + + /** + * Get UGI information from the OMRequest. + * @return + */ + public OzoneManagerProtocolProtos.UgiInfo getUgiInfo() { + UserGroupInformation user = ProtobufRpcEngine.Server.getRemoteUser(); + InetAddress remoteAddress = ProtobufRpcEngine.Server.getRemoteIp(); + OzoneManagerProtocolProtos.UgiInfo.Builder ugi = + OzoneManagerProtocolProtos.UgiInfo.newBuilder(); + + if (user != null) { + ugi.setUserName(user.getUserName()); + } + + if (remoteAddress != null) { + ugi.setRemoteAddress(remoteAddress.getHostAddress()).build(); + } + + return ugi.build(); + } + + /** + * Check Acls of ozone object. + * @param ozoneManager + * @param resType + * @param storeType + * @param aclType + * @param vol + * @param bucket + * @param key + * @throws IOException + */ + public void checkAcls(OzoneManager ozoneManager, + OzoneObj.ResourceType resType, + OzoneObj.StoreType storeType, IAccessAuthorizer.ACLType aclType, + String vol, String bucket, String key) throws IOException { + ozoneManager.checkAcls(resType, storeType, aclType, vol, bucket, key, + createUGI(), InetAddress.getByName(omRequest.getUgiInfo() + .getRemoteAddress())); + } + + private UserGroupInformation createUGI() { + return UserGroupInformation.createRemoteUser( + omRequest.getUgiInfo().getUserName()); + } + +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/package-info.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/package-info.java new file mode 100644 index 0000000000000..6146d74b57092 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains classes for handling OMRequest's. + */ \ No newline at end of file diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMBucketCreateResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMBucketCreateResponse.java new file mode 100644 index 0000000000000..78d13ad8a389b --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMBucketCreateResponse.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.response; + +import java.io.IOException; + +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMResponse; +import org.apache.hadoop.utils.db.BatchOperation; + +/** + * Response for CreateBucket request. + */ +public final class OMBucketCreateResponse extends OMClientResponse { + + private final OmBucketInfo omBucketInfo; + + public OMBucketCreateResponse(OmBucketInfo omBucketInfo, + OMResponse omResponse) { + super(omResponse); + this.omBucketInfo = omBucketInfo; + } + + @Override + public void addToDBBatch(OMMetadataManager omMetadataManager, + BatchOperation batchOperation) throws IOException { + String dbBucketKey = + omMetadataManager.getBucketKey(omBucketInfo.getVolumeName(), + omBucketInfo.getBucketName()); + omMetadataManager.getBucketTable().putWithBatch(batchOperation, dbBucketKey, + omBucketInfo); + } + + public OmBucketInfo getOmBucketInfo() { + return omBucketInfo; + } + +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMBucketDeleteResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMBucketDeleteResponse.java new file mode 100644 index 0000000000000..1fbed26599c69 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMBucketDeleteResponse.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.response; + +import java.io.IOException; + +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.utils.db.BatchOperation; + +/** + * Response for DeleteBucket request. + */ +public final class OMBucketDeleteResponse extends OMClientResponse { + + private String volumeName; + private String bucketName; + + public OMBucketDeleteResponse( + String volumeName, String bucketName, + OzoneManagerProtocolProtos.OMResponse omResponse) { + super(omResponse); + this.volumeName = volumeName; + this.bucketName = bucketName; + } + + @Override + public void addToDBBatch(OMMetadataManager omMetadataManager, + BatchOperation batchOperation) throws IOException { + String dbBucketKey = + omMetadataManager.getBucketKey(volumeName, bucketName); + omMetadataManager.getBucketTable().deleteWithBatch(batchOperation, + dbBucketKey); + } + + public String getVolumeName() { + return volumeName; + } + + public String getBucketName() { + return bucketName; + } + +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMBucketSetPropertyResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMBucketSetPropertyResponse.java new file mode 100644 index 0000000000000..cc4bd05d879f1 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMBucketSetPropertyResponse.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.om.response; + +import java.io.IOException; + +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMResponse; +import org.apache.hadoop.utils.db.BatchOperation; + +/** + * Response for SetBucketProperty request. + */ +public class OMBucketSetPropertyResponse extends OMClientResponse { + private OmBucketInfo omBucketInfo; + + public OMBucketSetPropertyResponse(OmBucketInfo omBucketInfo, + OMResponse omResponse) { + super(omResponse); + this.omBucketInfo = omBucketInfo; + } + + @Override + public void addToDBBatch(OMMetadataManager omMetadataManager, + BatchOperation batchOperation) throws IOException { + String dbBucketKey = + omMetadataManager.getBucketKey(omBucketInfo.getVolumeName(), + omBucketInfo.getBucketName()); + omMetadataManager.getBucketTable().putWithBatch(batchOperation, dbBucketKey, + omBucketInfo); + } + +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMClientResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMClientResponse.java new file mode 100644 index 0000000000000..bff68ef904848 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMClientResponse.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.response; + +import java.io.IOException; + +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMResponse; +import org.apache.hadoop.utils.db.BatchOperation; + +/** + * Interface for OM Responses, each OM response should implement this interface. + */ +public abstract class OMClientResponse { + + private OMResponse omResponse; + + public OMClientResponse(OMResponse omResponse) { + this.omResponse = omResponse; + } + + /** + * Implement logic to add the response to batch. + * @param omMetadataManager + * @param batchOperation + * @throws IOException + */ + public abstract void addToDBBatch(OMMetadataManager omMetadataManager, + BatchOperation batchOperation) throws IOException; + + /** + * Return OMResponse. + * @return OMResponse + */ + public OMResponse getOMResponse() { + return omResponse; + } + +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMVolumeCreateResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMVolumeCreateResponse.java new file mode 100644 index 0000000000000..260f6c75b2fa8 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMVolumeCreateResponse.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.response; + +import java.io.IOException; + +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .VolumeList; + +import org.apache.hadoop.utils.db.BatchOperation; + +/** + * Response for CreateBucket request. + */ +public class OMVolumeCreateResponse extends OMClientResponse { + + private VolumeList volumeList; + private OmVolumeArgs omVolumeArgs; + + public OMVolumeCreateResponse(OmVolumeArgs omVolumeArgs, + VolumeList volumeList, OMResponse omResponse) { + super(omResponse); + this.omVolumeArgs = omVolumeArgs; + this.volumeList = volumeList; + } + @Override + public void addToDBBatch(OMMetadataManager omMetadataManager, + BatchOperation batchOperation) throws IOException { + + String dbVolumeKey = + omMetadataManager.getVolumeKey(omVolumeArgs.getVolume()); + String dbUserKey = + omMetadataManager.getUserKey(omVolumeArgs.getOwnerName()); + + omMetadataManager.getVolumeTable().putWithBatch(batchOperation, dbVolumeKey, + omVolumeArgs); + omMetadataManager.getUserTable().putWithBatch(batchOperation, dbUserKey, + volumeList); + } + + public VolumeList getVolumeList() { + return volumeList; + } + + public OmVolumeArgs getOmVolumeArgs() { + return omVolumeArgs; + } + +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMVolumeDeleteResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMVolumeDeleteResponse.java new file mode 100644 index 0000000000000..690751550522d --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMVolumeDeleteResponse.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.response; + +import java.io.IOException; + +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .VolumeList; +import org.apache.hadoop.utils.db.BatchOperation; + +/** + * Response for CreateVolume request. + */ +public class OMVolumeDeleteResponse extends OMClientResponse { + private String volume; + private String owner; + private VolumeList updatedVolumeList; + + public OMVolumeDeleteResponse(String volume, String owner, + VolumeList updatedVolumeList, OMResponse omResponse) { + super(omResponse); + this.volume = volume; + this.owner = owner; + this.updatedVolumeList = updatedVolumeList; + } + + @Override + public void addToDBBatch(OMMetadataManager omMetadataManager, + BatchOperation batchOperation) throws IOException { + String dbUserKey = omMetadataManager.getUserKey(owner); + VolumeList volumeList = updatedVolumeList; + if (updatedVolumeList.getVolumeNamesList().size() == 0) { + omMetadataManager.getUserTable().deleteWithBatch(batchOperation, + dbUserKey); + } else { + omMetadataManager.getUserTable().putWithBatch(batchOperation, dbUserKey, + volumeList); + } + omMetadataManager.getVolumeTable().deleteWithBatch(batchOperation, + omMetadataManager.getVolumeKey(volume)); + } + +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/package-info.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/package-info.java new file mode 100644 index 0000000000000..d66cac7c021c2 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/package-info.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.response; + + +/** + * This package contains classes for the OM Responses. + */ \ No newline at end of file diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandler.java index 1ccac3bedc34b..6a992050e266a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandler.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandler.java @@ -41,8 +41,10 @@ public interface OzoneManagerHARequestHandler extends RequestHandler { /** * Handle Apply Transaction Requests from OzoneManager StateMachine. * @param omRequest + * @param transactionLogIndex - ratis transaction log index * @return OMResponse */ - OMResponse handleApplyTransaction(OMRequest omRequest); + OMResponse handleApplyTransaction(OMRequest omRequest, + long transactionLogIndex); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java index 9dd27b83d183c..3a6d0df4ed150 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java @@ -19,25 +19,19 @@ import java.io.IOException; -import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; -import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.OmVolumeOwnerChangeResponse; -import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol; +import org.apache.hadoop.ozone.om.ratis.OzoneManagerDoubleBuffer; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; +import org.apache.hadoop.ozone.om.request.OMClientRequest; +import org.apache.hadoop.ozone.om.response.OMClientResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .CreateBucketRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .CreateBucketResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .CreateVolumeRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .CreateVolumeResponse; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .DeleteBucketRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .DeleteBucketResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .DeleteVolumeRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos @@ -46,10 +40,6 @@ .OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .OMResponse; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .SetBucketPropertyRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .SetBucketPropertyResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .SetVolumePropertyRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos @@ -70,8 +60,12 @@ public class OzoneManagerHARequestHandlerImpl extends OzoneManagerRequestHandler implements OzoneManagerHARequestHandler { - public OzoneManagerHARequestHandlerImpl(OzoneManagerServerProtocol om) { + private OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer; + + public OzoneManagerHARequestHandlerImpl(OzoneManager om, + OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer) { super(om); + this.ozoneManagerDoubleBuffer = ozoneManagerDoubleBuffer; } @Override @@ -90,15 +84,6 @@ public OMRequest handleStartTransaction(OMRequest omRequest) case DeleteVolume: newOmRequest = handleDeleteVolumeStart(omRequest); break; - case CreateBucket: - newOmRequest = handleCreateBucketStart(omRequest); - break; - case SetBucketProperty: - newOmRequest = handleSetBucketPropertyStart(omRequest); - break; - case DeleteBucket: - newOmRequest = handleDeleteBucketRequestStart(omRequest); - break; default: throw new IOException("Unrecognized Command Type:" + cmdType); } @@ -107,7 +92,8 @@ public OMRequest handleStartTransaction(OMRequest omRequest) @Override - public OMResponse handleApplyTransaction(OMRequest omRequest) { + public OMResponse handleApplyTransaction(OMRequest omRequest, + long transactionLogIndex) { LOG.debug("Received OMRequest: {}, ", omRequest); Type cmdType = omRequest.getCmdType(); OMResponse.Builder responseBuilder = @@ -128,17 +114,26 @@ public OMResponse handleApplyTransaction(OMRequest omRequest) { handleDeleteVolumeApply(omRequest)); break; case CreateBucket: - responseBuilder.setCreateBucketResponse( - handleCreateBucketApply(omRequest)); - break; - case SetBucketProperty: - responseBuilder.setSetBucketPropertyResponse( - handleSetBucketPropertyApply(omRequest)); - break; case DeleteBucket: - responseBuilder.setDeleteBucketResponse( - handleDeleteBucketApply(omRequest)); - break; + case SetBucketProperty: + //TODO: We don't need to pass transactionID, this will be removed when + // complete write requests is changed to new model. And also we can + // return OMClientResponse, then adding to doubleBuffer can be taken + // care by stateMachine. And also integrate both HA and NON HA code + // paths. + OMClientRequest omClientRequest = + OzoneManagerRatisUtils.createClientRequest(omRequest); + OMClientResponse omClientResponse = + omClientRequest.validateAndUpdateCache(getOzoneManager(), + transactionLogIndex); + + // If any error we have got when validateAndUpdateCache, OMResponse + // Status is set with Error Code other than OK, in that case don't + // add this to double buffer. + if (omClientResponse.getOMResponse().getStatus() == Status.OK) { + ozoneManagerDoubleBuffer.add(omClientResponse, transactionLogIndex); + } + return omClientResponse.getOMResponse(); default: // As all request types are not changed so we need to call handle // here. @@ -160,7 +155,7 @@ private OMRequest handleCreateVolumeStart(OMRequest omRequest) throws IOException { VolumeInfo volumeInfo = omRequest.getCreateVolumeRequest().getVolumeInfo(); OzoneManagerProtocolProtos.VolumeList volumeList = - getOzoneManagerServerProtocol().startCreateVolume( + getOzoneManager().startCreateVolume( OmVolumeArgs.getFromProtobuf(volumeInfo)); CreateVolumeRequest createVolumeRequest = @@ -176,7 +171,7 @@ private CreateVolumeResponse handleCreateVolumeApply(OMRequest omRequest) omRequest.getCreateVolumeRequest().getVolumeInfo(); VolumeList volumeList = omRequest.getCreateVolumeRequest().getVolumeList(); - getOzoneManagerServerProtocol().applyCreateVolume( + getOzoneManager().applyCreateVolume( OmVolumeArgs.getFromProtobuf(volumeInfo), volumeList); return CreateVolumeResponse.newBuilder().build(); @@ -191,7 +186,7 @@ private OMRequest handleSetVolumePropertyStart(OMRequest omRequest) if (setVolumePropertyRequest.hasQuotaInBytes()) { long quota = setVolumePropertyRequest.getQuotaInBytes(); OmVolumeArgs omVolumeArgs = - getOzoneManagerServerProtocol().startSetQuota(volume, quota); + getOzoneManager().startSetQuota(volume, quota); SetVolumePropertyRequest newSetVolumePropertyRequest = SetVolumePropertyRequest.newBuilder().setVolumeName(volume) .setVolumeInfo(omVolumeArgs.getProtobuf()).build(); @@ -201,7 +196,7 @@ private OMRequest handleSetVolumePropertyStart(OMRequest omRequest) } else { String owner = setVolumePropertyRequest.getOwnerName(); OmVolumeOwnerChangeResponse omVolumeOwnerChangeResponse = - getOzoneManagerServerProtocol().startSetOwner(volume, owner); + getOzoneManager().startSetOwner(volume, owner); // If volumeLists become large and as ratis writes the request to disk we // might take more space if the lists become very big in size. We might // need to revisit this if it becomes problem @@ -230,11 +225,11 @@ private SetVolumePropertyResponse handleSetVolumePropertyApply( omRequest.getSetVolumePropertyRequest(); if (setVolumePropertyRequest.hasQuotaInBytes()) { - getOzoneManagerServerProtocol().applySetQuota( + getOzoneManager().applySetQuota( OmVolumeArgs.getFromProtobuf( setVolumePropertyRequest.getVolumeInfo())); } else { - getOzoneManagerServerProtocol().applySetOwner( + getOzoneManager().applySetOwner( setVolumePropertyRequest.getOriginalOwner(), setVolumePropertyRequest.getOldOwnerVolumeList(), setVolumePropertyRequest.getNewOwnerVolumeList(), @@ -252,7 +247,7 @@ private OMRequest handleDeleteVolumeStart(OMRequest omRequest) String volume = deleteVolumeRequest.getVolumeName(); OmDeleteVolumeResponse omDeleteVolumeResponse = - getOzoneManagerServerProtocol().startDeleteVolume(volume); + getOzoneManager().startDeleteVolume(volume); DeleteVolumeRequest newDeleteVolumeRequest = DeleteVolumeRequest.newBuilder().setVolumeList( @@ -272,97 +267,10 @@ private DeleteVolumeResponse handleDeleteVolumeApply(OMRequest omRequest) DeleteVolumeRequest deleteVolumeRequest = omRequest.getDeleteVolumeRequest(); - getOzoneManagerServerProtocol().applyDeleteVolume( + getOzoneManager().applyDeleteVolume( deleteVolumeRequest.getVolumeName(), deleteVolumeRequest.getOwner(), deleteVolumeRequest.getVolumeList()); return DeleteVolumeResponse.newBuilder().build(); } - - private OMRequest handleCreateBucketStart(OMRequest omRequest) - throws IOException { - - CreateBucketRequest createBucketRequest = - omRequest.getCreateBucketRequest(); - - OmBucketInfo omBucketInfo = - getOzoneManagerServerProtocol().startCreateBucket( - OmBucketInfo.getFromProtobuf(createBucketRequest.getBucketInfo())); - - CreateBucketRequest newCreateBucketRequest = - CreateBucketRequest.newBuilder().setBucketInfo( - omBucketInfo.getProtobuf()).build(); - return omRequest.toBuilder().setCreateBucketRequest(newCreateBucketRequest) - .build(); - - } - - - private CreateBucketResponse handleCreateBucketApply(OMRequest omRequest) - throws IOException { - CreateBucketRequest createBucketRequest = - omRequest.getCreateBucketRequest(); - - getOzoneManagerServerProtocol().applyCreateBucket( - OmBucketInfo.getFromProtobuf(createBucketRequest.getBucketInfo())); - - return CreateBucketResponse.newBuilder().build(); - } - - - private OMRequest handleDeleteBucketRequestStart(OMRequest omRequest) - throws IOException { - - DeleteBucketRequest deleteBucketRequest = - omRequest.getDeleteBucketRequest(); - getOzoneManagerServerProtocol().startDeleteBucket( - deleteBucketRequest.getVolumeName(), - deleteBucketRequest.getBucketName()); - - return omRequest; - } - - private DeleteBucketResponse handleDeleteBucketApply(OMRequest omRequest) - throws IOException { - - DeleteBucketRequest deleteBucketRequest = - omRequest.getDeleteBucketRequest(); - - getOzoneManagerServerProtocol().applyDeleteBucket( - deleteBucketRequest.getVolumeName(), - deleteBucketRequest.getBucketName()); - - return DeleteBucketResponse.newBuilder().build(); - } - - private OMRequest handleSetBucketPropertyStart( - OMRequest omRequest) throws IOException { - SetBucketPropertyRequest setBucketPropertyRequest = - omRequest.getSetBucketPropertyRequest(); - - OmBucketInfo omBucketInfo = - getOzoneManagerServerProtocol().startSetBucketProperty( - OmBucketArgs.getFromProtobuf(setBucketPropertyRequest.getBucketArgs())); - - SetBucketPropertyRequest newSetBucketPropertyRequest = - SetBucketPropertyRequest.newBuilder() - .setBucketInfo(omBucketInfo.getProtobuf()).build(); - - return omRequest.toBuilder().setSetBucketPropertyRequest( - newSetBucketPropertyRequest).build(); - } - - private SetBucketPropertyResponse handleSetBucketPropertyApply( - OMRequest omRequest) throws IOException { - SetBucketPropertyRequest setBucketPropertyRequest = - omRequest.getSetBucketPropertyRequest(); - - getOzoneManagerServerProtocol().applySetBucketProperty( - OmBucketInfo.getFromProtobuf(setBucketPropertyRequest.getBucketInfo())); - - return SetBucketPropertyResponse.newBuilder().build(); - } - - - } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java index 72b4d12e73018..4d3002d3a35eb 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java @@ -16,13 +16,16 @@ */ package org.apache.hadoop.ozone.protocolPB; + import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.ozone.OmUtils; +import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.exceptions.NotLeaderException; -import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; -import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisClient; import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; +import org.apache.hadoop.ozone.om.request.OMClientRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; @@ -33,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.Optional; /** @@ -45,9 +49,9 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements private static final Logger LOG = LoggerFactory .getLogger(OzoneManagerProtocolServerSideTranslatorPB.class); private final OzoneManagerRatisServer omRatisServer; - private final OzoneManagerRatisClient omRatisClient; private final RequestHandler handler; private final boolean isRatisEnabled; + private final OzoneManager ozoneManager; /** * Constructs an instance of the server handler. @@ -55,12 +59,13 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements * @param impl OzoneManagerProtocolPB */ public OzoneManagerProtocolServerSideTranslatorPB( - OzoneManagerServerProtocol impl, OzoneManagerRatisServer ratisServer, - OzoneManagerRatisClient ratisClient, boolean enableRatis) { + OzoneManager impl, OzoneManagerRatisServer ratisServer, + boolean enableRatis) { + this.ozoneManager = impl; handler = new OzoneManagerRequestHandler(impl); this.omRatisServer = ratisServer; - this.omRatisClient = ratisClient; this.isRatisEnabled = enableRatis; + } /** @@ -80,6 +85,17 @@ public OMResponse submitRequest(RpcController controller, if (OmUtils.isReadOnly(request)) { return submitReadRequestToOM(request); } else { + // PreExecute if needed. + try { + OMClientRequest omClientRequest = + OzoneManagerRatisUtils.createClientRequest(request); + if (omClientRequest != null) { + request = omClientRequest.preExecute(ozoneManager); + } + } catch (IOException ex) { + // As some of the preExecute returns error. So handle here. + return createErrorResponse(request, ex); + } return submitRequestToRatis(request); } } else { @@ -89,12 +105,47 @@ public OMResponse submitRequest(RpcController controller, scope.close(); } } + + /** + * Create OMResponse from the specified OMRequest and exception. + * @param omRequest + * @param exception + * @return OMResponse + */ + private OMResponse createErrorResponse( + OMRequest omRequest, IOException exception) { + OzoneManagerProtocolProtos.Type cmdType = omRequest.getCmdType(); + switch (cmdType) { + case CreateBucket: + OMResponse.Builder omResponse = OMResponse.newBuilder() + .setStatus( + OzoneManagerRatisUtils.exceptionToResponseStatus(exception)) + .setCmdType(cmdType) + .setMessage(exception.getMessage()) + .setSuccess(false); + if (exception.getMessage() != null) { + omResponse.setMessage(exception.getMessage()); + } + return omResponse.build(); + case DeleteBucket: + case SetBucketProperty: + // In these cases, we can return null. As this method is called when + // some error occurred in preExecute. For these request types + // preExecute is do nothing. + return null; + default: + // We shall never come here. + return null; + } + } /** * Submits request to OM's Ratis server. */ private OMResponse submitRequestToRatis(OMRequest request) throws ServiceException { - return omRatisClient.sendCommand(request); + //TODO: Need to remove OzoneManagerRatisClient, as now we are using + // RatisServer Api's. + return omRatisServer.submitRequest(request); } private OMResponse submitReadRequestToOM(OMRequest request) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java index 1b95a2eea90ef..67b0755f93d5c 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; @@ -42,7 +43,6 @@ import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus; import org.apache.hadoop.ozone.om.helpers.ServiceInfo; -import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFileStatusRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFileStatusResponse; @@ -134,9 +134,9 @@ public class OzoneManagerRequestHandler implements RequestHandler { static final Logger LOG = LoggerFactory.getLogger(OzoneManagerRequestHandler.class); - private final OzoneManagerServerProtocol impl; + private final OzoneManager impl; - public OzoneManagerRequestHandler(OzoneManagerServerProtocol om) { + public OzoneManagerRequestHandler(OzoneManager om) { this.impl = om; } @@ -1034,10 +1034,6 @@ private OzoneManagerProtocolProtos.LookupFileResponse lookupFile( .build(); } - protected OzoneManagerServerProtocol getOzoneManagerServerProtocol() { - return impl; - } - private OzoneManagerProtocolProtos.ListStatusResponse listStatus( OzoneManagerProtocolProtos.ListStatusRequest request) throws IOException { KeyArgs keyArgs = request.getKeyArgs(); @@ -1057,4 +1053,8 @@ private OzoneManagerProtocolProtos.ListStatusResponse listStatus( } return listStatusResponseBuilder.build(); } + + protected OzoneManager getOzoneManager() { + return impl; + } } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java new file mode 100644 index 0000000000000..3488ea30871a8 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java @@ -0,0 +1,419 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.ratis; + +import java.io.IOException; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import org.apache.hadoop.ozone.om.response.OMBucketCreateResponse; +import org.apache.hadoop.ozone.om.response.OMBucketDeleteResponse; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.utils.db.BatchOperation; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; +import org.apache.hadoop.ozone.om.response.OMVolumeCreateResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.Time; + +import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS; +import static org.junit.Assert.fail; + +/** + * This class tests OzoneManagerDouble Buffer. + */ +public class TestOzoneManagerDoubleBuffer { + + private OMMetadataManager omMetadataManager; + private OzoneManagerDoubleBuffer doubleBuffer; + private AtomicLong trxId = new AtomicLong(0); + private OzoneManagerRatisSnapshot ozoneManagerRatisSnapshot; + private long lastAppliedIndex; + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private void setup() throws IOException { + OzoneConfiguration configuration = new OzoneConfiguration(); + configuration.set(OZONE_METADATA_DIRS, + folder.newFolder().getAbsolutePath()); + omMetadataManager = + new OmMetadataManagerImpl(configuration); + ozoneManagerRatisSnapshot = index -> { + lastAppliedIndex = index; + return index; + }; + doubleBuffer = new OzoneManagerDoubleBuffer(omMetadataManager, + ozoneManagerRatisSnapshot); + } + + private void stop() { + doubleBuffer.stop(); + } + + @Test(timeout = 300_000) + public void testDoubleBufferWithDummyResponse() throws Exception { + try { + setup(); + String volumeName = UUID.randomUUID().toString(); + int bucketCount = 100; + for (int i=0; i < bucketCount; i++) { + doubleBuffer.add(createDummyBucketResponse(volumeName, + UUID.randomUUID().toString()), trxId.incrementAndGet()); + } + GenericTestUtils.waitFor(() -> + doubleBuffer.getFlushedTransactionCount() == bucketCount, 100, + 120000); + Assert.assertTrue(omMetadataManager.countRowsInTable( + omMetadataManager.getBucketTable()) == (bucketCount)); + Assert.assertTrue(doubleBuffer.getFlushIterations() > 0); + + // Check lastAppliedIndex is updated correctly or not. + Assert.assertEquals(bucketCount, lastAppliedIndex); + } finally { + stop(); + } + } + + + @Test(timeout = 300_000) + public void testDoubleBuffer() throws Exception { + // This test checks whether count in tables are correct or not. + testDoubleBuffer(1, 10); + testDoubleBuffer(10, 100); + testDoubleBuffer(100, 100); + testDoubleBuffer(1000, 1000); + } + + + + @Test + public void testDoubleBufferWithMixOfTransactions() throws Exception { + // This test checks count, data in table is correct or not. + try { + setup(); + + Queue< OMBucketCreateResponse > bucketQueue = + new ConcurrentLinkedQueue<>(); + Queue< OMBucketDeleteResponse > deleteBucketQueue = + new ConcurrentLinkedQueue<>(); + + String volumeName = UUID.randomUUID().toString(); + OMVolumeCreateResponse omVolumeCreateResponse = createVolume(volumeName); + doubleBuffer.add(omVolumeCreateResponse, trxId.incrementAndGet()); + + + int bucketCount = 10; + + doMixTransactions(volumeName, 10, deleteBucketQueue, bucketQueue); + + // As for every 2 transactions of create bucket we add deleted bucket. + final int deleteCount = 5; + + // We are doing +1 for volume transaction. + GenericTestUtils.waitFor(() -> + doubleBuffer.getFlushedTransactionCount() == + (bucketCount + deleteCount + 1), 100, 120000); + + Assert.assertTrue(omMetadataManager.countRowsInTable( + omMetadataManager.getVolumeTable()) == 1); + + Assert.assertTrue(omMetadataManager.countRowsInTable( + omMetadataManager.getBucketTable()) == 5); + + // Now after this in our DB we should have 5 buckets and one volume + + + checkVolume(volumeName, omVolumeCreateResponse); + + checkCreateBuckets(bucketQueue); + + checkDeletedBuckets(deleteBucketQueue); + } finally { + stop(); + } + } + + @Test + public void testDoubleBufferWithMixOfTransactionsParallel() throws Exception { + // This test checks count, data in table is correct or not. + try { + setup(); + + Queue< OMBucketCreateResponse > bucketQueue = + new ConcurrentLinkedQueue<>(); + Queue< OMBucketDeleteResponse > deleteBucketQueue = + new ConcurrentLinkedQueue<>(); + + String volumeName1 = UUID.randomUUID().toString(); + OMVolumeCreateResponse omVolumeCreateResponse1 = + createVolume(volumeName1); + + String volumeName2 = UUID.randomUUID().toString(); + OMVolumeCreateResponse omVolumeCreateResponse2 = + createVolume(volumeName2); + + doubleBuffer.add(omVolumeCreateResponse1, trxId.incrementAndGet()); + + doubleBuffer.add(omVolumeCreateResponse2, trxId.incrementAndGet()); + + Daemon daemon1 = new Daemon(() -> doMixTransactions(volumeName1, 10, + deleteBucketQueue, bucketQueue)); + Daemon daemon2 = new Daemon(() -> doMixTransactions(volumeName2, 10, + deleteBucketQueue, bucketQueue)); + + daemon1.start(); + daemon2.start(); + + int bucketCount = 20; + + // As for every 2 transactions of create bucket we add deleted bucket. + final int deleteCount = 10; + + // We are doing +1 for volume transaction. + GenericTestUtils.waitFor(() -> doubleBuffer.getFlushedTransactionCount() + == (bucketCount + deleteCount + 2), + 100, + 120000); + + Assert.assertTrue(omMetadataManager.countRowsInTable( + omMetadataManager.getVolumeTable()) == 2); + + Assert.assertTrue(omMetadataManager.countRowsInTable( + omMetadataManager.getBucketTable()) == 10); + + // Now after this in our DB we should have 5 buckets and one volume + + + checkVolume(volumeName1, omVolumeCreateResponse1); + checkVolume(volumeName2, omVolumeCreateResponse2); + + checkCreateBuckets(bucketQueue); + + checkDeletedBuckets(deleteBucketQueue); + } finally { + stop(); + } + } + + + private void doMixTransactions(String volumeName, int bucketCount, + Queue deleteBucketQueue, + Queue bucketQueue) { + for (int i=0; i < bucketCount; i++) { + String bucketName = UUID.randomUUID().toString(); + OMBucketCreateResponse omBucketCreateResponse = createBucket(volumeName, + bucketName); + doubleBuffer.add(omBucketCreateResponse, trxId.incrementAndGet()); + // For every 2 transactions have a deleted bucket. + if (i % 2 == 0) { + OMBucketDeleteResponse omBucketDeleteResponse = + deleteBucket(volumeName, bucketName); + doubleBuffer.add(omBucketDeleteResponse, trxId.incrementAndGet()); + deleteBucketQueue.add(omBucketDeleteResponse); + } else { + bucketQueue.add(omBucketCreateResponse); + } + } + } + + private void checkVolume(String volumeName, + OMVolumeCreateResponse omVolumeCreateResponse) throws Exception { + OmVolumeArgs tableVolumeArgs = omMetadataManager.getVolumeTable().get( + omMetadataManager.getVolumeKey(volumeName)); + Assert.assertTrue(tableVolumeArgs != null); + + OmVolumeArgs omVolumeArgs = omVolumeCreateResponse.getOmVolumeArgs(); + + Assert.assertEquals(omVolumeArgs.getVolume(), tableVolumeArgs.getVolume()); + Assert.assertEquals(omVolumeArgs.getAdminName(), + tableVolumeArgs.getAdminName()); + Assert.assertEquals(omVolumeArgs.getOwnerName(), + tableVolumeArgs.getOwnerName()); + Assert.assertEquals(omVolumeArgs.getCreationTime(), + tableVolumeArgs.getCreationTime()); + } + + private void checkCreateBuckets(Queue bucketQueue) { + bucketQueue.forEach((omBucketCreateResponse) -> { + OmBucketInfo omBucketInfo = omBucketCreateResponse.getOmBucketInfo(); + String bucket = omBucketInfo.getBucketName(); + OmBucketInfo tableBucketInfo = null; + try { + tableBucketInfo = + omMetadataManager.getBucketTable().get( + omMetadataManager.getBucketKey(omBucketInfo.getVolumeName(), + bucket)); + } catch (IOException ex) { + fail("testDoubleBufferWithMixOfTransactions failed"); + } + Assert.assertNotNull(tableBucketInfo); + + Assert.assertEquals(omBucketInfo.getVolumeName(), + tableBucketInfo.getVolumeName()); + Assert.assertEquals(omBucketInfo.getBucketName(), + tableBucketInfo.getBucketName()); + Assert.assertEquals(omBucketInfo.getCreationTime(), + tableBucketInfo.getCreationTime()); + }); + } + + private void checkDeletedBuckets(Queue + deleteBucketQueue) { + deleteBucketQueue.forEach((omBucketDeleteResponse -> { + try { + Assert.assertNull(omMetadataManager.getBucketTable().get( + omMetadataManager.getBucketKey( + omBucketDeleteResponse.getVolumeName(), + omBucketDeleteResponse.getBucketName()))); + } catch (IOException ex) { + fail("testDoubleBufferWithMixOfTransactions failed"); + } + })); + } + + public void testDoubleBuffer(int iterations, int bucketCount) + throws Exception { + try { + setup(); + for (int i = 0; i < iterations; i++) { + Daemon d1 = new Daemon(() -> + doTransactions(UUID.randomUUID().toString(), bucketCount)); + d1.start(); + } + + // We are doing +1 for volume transaction. + GenericTestUtils.waitFor(() -> + doubleBuffer.getFlushedTransactionCount() == + (bucketCount + 1) * iterations, 100, + 120000); + + Assert.assertTrue(omMetadataManager.countRowsInTable( + omMetadataManager.getVolumeTable()) == iterations); + + Assert.assertTrue(omMetadataManager.countRowsInTable( + omMetadataManager.getBucketTable()) == (bucketCount) * iterations); + + Assert.assertTrue(doubleBuffer.getFlushIterations() > 0); + } finally { + stop(); + } + } + + + public void doTransactions(String volumeName, int buckets) { + doubleBuffer.add(createVolume(volumeName), trxId.incrementAndGet()); + for (int i=0; i< buckets; i++) { + doubleBuffer.add(createBucket(volumeName, UUID.randomUUID().toString()), + trxId.incrementAndGet()); + // For every 100 buckets creation adding 100ms delay + + if (i % 100 == 0) { + try { + Thread.sleep(100); + } catch (Exception ex) { + + } + } + } + } + + private OMVolumeCreateResponse createVolume(String volumeName) { + OmVolumeArgs omVolumeArgs = + OmVolumeArgs.newBuilder() + .setAdminName(UUID.randomUUID().toString()) + .setOwnerName(UUID.randomUUID().toString()) + .setVolume(volumeName) + .setCreationTime(Time.now()).build(); + + VolumeList volumeList = VolumeList.newBuilder() + .addVolumeNames(volumeName).build(); + return new OMVolumeCreateResponse(omVolumeArgs, volumeList, + OzoneManagerProtocolProtos.OMResponse.newBuilder() + .setCmdType(OzoneManagerProtocolProtos.Type.CreateVolume) + .setStatus(OzoneManagerProtocolProtos.Status.OK).build()); + } + + private OMBucketCreateResponse createBucket(String volumeName, + String bucketName) { + OmBucketInfo omBucketInfo = + OmBucketInfo.newBuilder().setVolumeName(volumeName) + .setBucketName(bucketName).setCreationTime(Time.now()).build(); + return new OMBucketCreateResponse(omBucketInfo, + OzoneManagerProtocolProtos.OMResponse.newBuilder() + .setCmdType(OzoneManagerProtocolProtos.Type.CreateBucket) + .setStatus(OzoneManagerProtocolProtos.Status.OK).build()); + } + + private OMDummyCreateBucketResponse createDummyBucketResponse( + String volumeName, String bucketName) { + OmBucketInfo omBucketInfo = + OmBucketInfo.newBuilder().setVolumeName(volumeName) + .setBucketName(bucketName).setCreationTime(Time.now()).build(); + return new OMDummyCreateBucketResponse(omBucketInfo, + OzoneManagerProtocolProtos.OMResponse.newBuilder() + .setCmdType(OzoneManagerProtocolProtos.Type.CreateBucket) + .setStatus(OzoneManagerProtocolProtos.Status.OK).build()); + } + + private OMBucketDeleteResponse deleteBucket(String volumeName, + String bucketName) { + return new OMBucketDeleteResponse(volumeName, bucketName, + OzoneManagerProtocolProtos.OMResponse.newBuilder() + .setCmdType(OzoneManagerProtocolProtos.Type.DeleteBucket) + .setStatus(OzoneManagerProtocolProtos.Status.OK).build()); + } + + /** + * DummyCreatedBucket Response class used in testing. + */ + public static class OMDummyCreateBucketResponse extends OMClientResponse { + private final OmBucketInfo omBucketInfo; + + public OMDummyCreateBucketResponse(OmBucketInfo omBucketInfo, + OzoneManagerProtocolProtos.OMResponse omResponse) { + super(omResponse); + this.omBucketInfo = omBucketInfo; + } + + @Override + public void addToDBBatch(OMMetadataManager omMetadataManager, + BatchOperation batchOperation) throws IOException { + String dbBucketKey = + omMetadataManager.getBucketKey(omBucketInfo.getVolumeName(), + omBucketInfo.getBucketName()); + omMetadataManager.getBucketTable().putWithBatch(batchOperation, + dbBucketKey, omBucketInfo); + } + + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java index 8a8be357c8fbf..5e54e9577732a 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java @@ -31,7 +31,10 @@ import org.apache.hadoop.ozone.OmUtils; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OMNodeDetails; +import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; +import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; @@ -41,22 +44,32 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; import org.slf4j.LoggerFactory; import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.when; /** * Test OM Ratis server. */ public class TestOzoneManagerRatisServer { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + private OzoneConfiguration conf; private OzoneManagerRatisServer omRatisServer; private OzoneManagerRatisClient omRatisClient; private String omID; private String clientId = UUID.randomUUID().toString(); private static final long LEADER_ELECTION_TIMEOUT = 500L; + private OMMetadataManager omMetadataManager; + private OzoneManager ozoneManager; @Before public void init() throws Exception { @@ -80,7 +93,13 @@ public void init() throws Exception { .setOMServiceId(OzoneConsts.OM_SERVICE_ID_DEFAULT) .build(); // Starts a single node Ratis server - omRatisServer = OzoneManagerRatisServer.newOMRatisServer(conf, null, + ozoneManager = Mockito.mock(OzoneManager.class); + OzoneConfiguration ozoneConfiguration = new OzoneConfiguration(); + ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS, + folder.newFolder().getAbsolutePath()); + omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration); + when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager); + omRatisServer = OzoneManagerRatisServer.newOMRatisServer(conf, ozoneManager, omNodeDetails, Collections.emptyList()); omRatisServer.start(); omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient(omID, @@ -164,8 +183,8 @@ public void verifyRaftGroupIdGenerationWithCustomOmServiceId() throws .build(); // Starts a single node Ratis server OzoneManagerRatisServer newOmRatisServer = OzoneManagerRatisServer - .newOMRatisServer(newConf, null, - omNodeDetails, Collections.emptyList()); + .newOMRatisServer(newConf, ozoneManager, omNodeDetails, + Collections.emptyList()); newOmRatisServer.start(); OzoneManagerRatisClient newOmRatisClient = OzoneManagerRatisClient .newOzoneManagerRatisClient( diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java index a40404b18ff4e..dc1f92d52b163 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java @@ -26,7 +26,10 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OMNodeDetails; +import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; +import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; @@ -75,6 +78,8 @@ public class TestOzoneManagerStateMachine { private OzoneManagerHARequestHandler requestHandler; private RaftGroupId raftGroupId; private OzoneManagerStateMachine ozoneManagerStateMachine; + private OMMetadataManager omMetadataManager; + private OzoneManager ozoneManager; @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -97,8 +102,14 @@ public void setup() throws Exception { .setOMNodeId(omID) .setOMServiceId(OzoneConsts.OM_SERVICE_ID_DEFAULT) .build(); + ozoneManager = Mockito.mock(OzoneManager.class); + OzoneConfiguration ozoneConfiguration = new OzoneConfiguration(); + ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS, + temporaryFolder.newFolder().getAbsolutePath()); + omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration); + when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager); // Starts a single node Ratis server - omRatisServer = OzoneManagerRatisServer.newOMRatisServer(conf, null, + omRatisServer = OzoneManagerRatisServer.newOMRatisServer(conf, ozoneManager, omNodeDetails, Collections.emptyList()); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMBucketCreateRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMBucketCreateRequest.java new file mode 100644 index 0000000000000..fdd5a659a9262 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMBucketCreateRequest.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.ozone.om.request; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OMMetrics; +import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .StorageTypeProto; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.util.Time; + +import static org.mockito.Mockito.when; + +/** + * Tests OMBucketCreateRequest class, which handles CreateBucket request. + */ +public class TestOMBucketCreateRequest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private OzoneManager ozoneManager; + private OMMetrics omMetrics; + private OMMetadataManager omMetadataManager; + + + @Before + public void setup() throws Exception { + ozoneManager = Mockito.mock(OzoneManager.class); + omMetrics = OMMetrics.create(); + OzoneConfiguration ozoneConfiguration = new OzoneConfiguration(); + ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS, + folder.newFolder().getAbsolutePath()); + omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration); + when(ozoneManager.getMetrics()).thenReturn(omMetrics); + when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager); + } + + @After + public void stop() { + omMetrics.unRegister(); + } + + + @Test + public void testPreExecute() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + doPreExecute(volumeName, bucketName); + } + + + @Test + public void testValidateAndUpdateCache() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + + OMBucketCreateRequest omBucketCreateRequest = doPreExecute(volumeName, + bucketName); + + doValidateAndUpdateCache(volumeName, bucketName, + omBucketCreateRequest.getOmRequest()); + + } + + @Test + public void testValidateAndUpdateCacheWithNoVolume() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + + OMRequest originalRequest = createBucketRequest(bucketName, volumeName, + false, StorageTypeProto.SSD); + + OMBucketCreateRequest omBucketCreateRequest = + new OMBucketCreateRequest(originalRequest); + + String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName); + + // As we have not still called validateAndUpdateCache, get() should + // return null. + + Assert.assertNull(omMetadataManager.getBucketTable().get(bucketKey)); + + OMClientResponse omClientResponse = + omBucketCreateRequest.validateAndUpdateCache(ozoneManager, 1); + + OMResponse omResponse = omClientResponse.getOMResponse(); + Assert.assertNotNull(omResponse.getCreateBucketResponse()); + Assert.assertEquals(OzoneManagerProtocolProtos.Status.VOLUME_NOT_FOUND, + omResponse.getStatus()); + + // As request valid bucket table should not have entry. + Assert.assertNull(omMetadataManager.getBucketTable().get(bucketKey)); + } + + + @Test + public void testValidateAndUpdateCacheWithBucketAlreadyExists() + throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + + OMBucketCreateRequest omBucketCreateRequest = + doPreExecute(volumeName, bucketName); + + doValidateAndUpdateCache(volumeName, bucketName, + omBucketCreateRequest.getOmRequest()); + + // Try create same bucket again + OMClientResponse omClientResponse = + omBucketCreateRequest.validateAndUpdateCache(ozoneManager, 2); + + OMResponse omResponse = omClientResponse.getOMResponse(); + Assert.assertNotNull(omResponse.getCreateBucketResponse()); + Assert.assertEquals(OzoneManagerProtocolProtos.Status.BUCKET_ALREADY_EXISTS, + omResponse.getStatus()); + } + + + private OMBucketCreateRequest doPreExecute(String volumeName, + String bucketName) throws Exception { + addCreateVolumeToTable(volumeName, omMetadataManager); + OMRequest originalRequest = createBucketRequest(bucketName, volumeName, + false, StorageTypeProto.SSD); + + OMBucketCreateRequest omBucketCreateRequest = + new OMBucketCreateRequest(originalRequest); + + OMRequest modifiedRequest = omBucketCreateRequest.preExecute(ozoneManager); + verifyRequest(modifiedRequest, originalRequest); + return new OMBucketCreateRequest(modifiedRequest); + } + + private void doValidateAndUpdateCache(String volumeName, String bucketName, + OMRequest modifiedRequest) throws Exception { + String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName); + + // As we have not still called validateAndUpdateCache, get() should + // return null. + + Assert.assertNull(omMetadataManager.getBucketTable().get(bucketKey)); + OMBucketCreateRequest omBucketCreateRequest = + new OMBucketCreateRequest(modifiedRequest); + + + OMClientResponse omClientResponse = + omBucketCreateRequest.validateAndUpdateCache(ozoneManager, 1); + + // As now after validateAndUpdateCache it should add entry to cache, get + // should return non null value. + OmBucketInfo omBucketInfo = + omMetadataManager.getBucketTable().get(bucketKey); + Assert.assertNotNull(omMetadataManager.getBucketTable().get(bucketKey)); + + // verify table data with actual request data. + Assert.assertEquals(OmBucketInfo.getFromProtobuf( + modifiedRequest.getCreateBucketRequest().getBucketInfo()), + omBucketInfo); + + // verify OMResponse. + verifySuccessCreateBucketResponse(omClientResponse.getOMResponse()); + + } + + + private void verifyRequest(OMRequest modifiedOmRequest, + OzoneManagerProtocolProtos.OMRequest originalRequest) { + OzoneManagerProtocolProtos.BucketInfo original = + originalRequest.getCreateBucketRequest().getBucketInfo(); + OzoneManagerProtocolProtos.BucketInfo updated = + modifiedOmRequest.getCreateBucketRequest().getBucketInfo(); + + Assert.assertEquals(original.getBucketName(), updated.getBucketName()); + Assert.assertEquals(original.getVolumeName(), updated.getVolumeName()); + Assert.assertEquals(original.getIsVersionEnabled(), + updated.getIsVersionEnabled()); + Assert.assertEquals(original.getStorageType(), updated.getStorageType()); + Assert.assertEquals(original.getMetadataList(), updated.getMetadataList()); + Assert.assertNotEquals(original.getCreationTime(), + updated.getCreationTime()); + } + + public static void verifySuccessCreateBucketResponse(OMResponse omResponse) { + Assert.assertNotNull(omResponse.getCreateBucketResponse()); + Assert.assertEquals(OzoneManagerProtocolProtos.Type.CreateBucket, + omResponse.getCmdType()); + Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK, + omResponse.getStatus()); + } + + public static void addCreateVolumeToTable(String volumeName, + OMMetadataManager omMetadataManager) throws Exception { + OmVolumeArgs omVolumeArgs = + OmVolumeArgs.newBuilder().setCreationTime(Time.now()) + .setVolume(volumeName).setAdminName(UUID.randomUUID().toString()) + .setOwnerName(UUID.randomUUID().toString()).build(); + omMetadataManager.getVolumeTable().put( + omMetadataManager.getVolumeKey(volumeName), omVolumeArgs); + } + + + public static OMRequest createBucketRequest(String bucketName, + String volumeName, boolean isVersionEnabled, + StorageTypeProto storageTypeProto) { + OzoneManagerProtocolProtos.BucketInfo bucketInfo = + OzoneManagerProtocolProtos.BucketInfo.newBuilder() + .setBucketName(bucketName) + .setVolumeName(volumeName) + .setIsVersionEnabled(isVersionEnabled) + .setStorageType(storageTypeProto) + .addAllMetadata(getMetadataList()).build(); + OzoneManagerProtocolProtos.CreateBucketRequest.Builder req = + OzoneManagerProtocolProtos.CreateBucketRequest.newBuilder(); + req.setBucketInfo(bucketInfo); + return OMRequest.newBuilder().setCreateBucketRequest(req) + .setCmdType(OzoneManagerProtocolProtos.Type.CreateBucket) + .setClientId(UUID.randomUUID().toString()).build(); + } + + public static List< HddsProtos.KeyValue> getMetadataList() { + List metadataList = new ArrayList<>(); + metadataList.add(HddsProtos.KeyValue.newBuilder().setKey("key1").setValue( + "value1").build()); + metadataList.add(HddsProtos.KeyValue.newBuilder().setKey("key2").setValue( + "value2").build()); + return metadataList; + } + +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMBucketDeleteRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMBucketDeleteRequest.java new file mode 100644 index 0000000000000..d734bd73a0c8b --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMBucketDeleteRequest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.ozone.om.request; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OMMetrics; +import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .DeleteBucketRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMRequest; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; + +import java.util.UUID; + +import static org.mockito.Mockito.when; + +/** + * Tests OMBucketDeleteRequest class which handles DeleteBucket request. + */ +public class TestOMBucketDeleteRequest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private OzoneManager ozoneManager; + private OMMetrics omMetrics; + private OMMetadataManager omMetadataManager; + + + @Before + public void setup() throws Exception { + ozoneManager = Mockito.mock(OzoneManager.class); + omMetrics = OMMetrics.create(); + OzoneConfiguration ozoneConfiguration = new OzoneConfiguration(); + ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS, + folder.newFolder().getAbsolutePath()); + omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration); + when(ozoneManager.getMetrics()).thenReturn(omMetrics); + when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager); + } + + @After + public void stop() { + omMetrics.unRegister(); + } + + @Test + public void testPreExecute() throws Exception { + OMRequest omRequest = + createDeleteBucketRequest(UUID.randomUUID().toString(), + UUID.randomUUID().toString()); + + OMBucketDeleteRequest omBucketDeleteRequest = + new OMBucketDeleteRequest(omRequest); + + + Assert.assertNotEquals(omRequest, + omBucketDeleteRequest.preExecute(ozoneManager)); + } + + + @Test + public void testValidateAndUpdateCache() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + OMRequest omRequest = + createDeleteBucketRequest(volumeName, bucketName); + + OMBucketDeleteRequest omBucketDeleteRequest = + new OMBucketDeleteRequest(omRequest); + + // Create Volume and bucket entries in DB. + TestOMRequestUtils.addVolumeAndBucketCreateEntriesToDB(volumeName, + bucketName, omMetadataManager); + + omBucketDeleteRequest.validateAndUpdateCache(ozoneManager, 1); + + Assert.assertNull(omMetadataManager.getBucketTable().get( + omMetadataManager.getBucketKey(volumeName, bucketName))); + } + + + @Test + public void testValidateAndUpdateCacheFailure() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + + OMRequest omRequest = + createDeleteBucketRequest(volumeName, bucketName); + + OMBucketDeleteRequest omBucketDeleteRequest = + new OMBucketDeleteRequest(omRequest); + + + OMClientResponse omClientResponse = + omBucketDeleteRequest.validateAndUpdateCache(ozoneManager, 1); + + Assert.assertNull(omMetadataManager.getBucketTable().get( + omMetadataManager.getBucketKey(volumeName, bucketName))); + + Assert.assertEquals(OzoneManagerProtocolProtos.Status.BUCKET_NOT_FOUND, + omClientResponse.getOMResponse().getStatus()); + } + + + + + private OMRequest createDeleteBucketRequest(String volumeName, + String bucketName) { + return OMRequest.newBuilder().setDeleteBucketRequest( + DeleteBucketRequest.newBuilder() + .setBucketName(bucketName).setVolumeName(volumeName)) + .setCmdType(OzoneManagerProtocolProtos.Type.DeleteBucket) + .setClientId(UUID.randomUUID().toString()).build(); + } + +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMBucketSetPropertyRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMBucketSetPropertyRequest.java new file mode 100644 index 0000000000000..ece71a34b45db --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMBucketSetPropertyRequest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.ozone.om.request; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OMMetrics; +import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos. + BucketArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .SetBucketPropertyRequest; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; + +import java.util.UUID; + +import static org.mockito.Mockito.when; + +/** + * Tests OMBucketSetPropertyRequest class which handles OMSetBucketProperty + * request. + */ +public class TestOMBucketSetPropertyRequest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private OzoneManager ozoneManager; + private OMMetrics omMetrics; + private OMMetadataManager omMetadataManager; + + + @Before + public void setup() throws Exception { + ozoneManager = Mockito.mock(OzoneManager.class); + omMetrics = OMMetrics.create(); + OzoneConfiguration ozoneConfiguration = new OzoneConfiguration(); + ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS, + folder.newFolder().getAbsolutePath()); + omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration); + when(ozoneManager.getMetrics()).thenReturn(omMetrics); + when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager); + } + + @After + public void stop() { + omMetrics.unRegister(); + } + + @Test + public void testPreExecute() throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + + + OMRequest omRequest = createSeBucketPropertyRequest(volumeName, + bucketName, true); + + OMBucketSetPropertyRequest omBucketSetPropertyRequest = + new OMBucketSetPropertyRequest(omRequest); + + Assert.assertNotEquals(omRequest, + omBucketSetPropertyRequest.preExecute(ozoneManager)); + } + + @Test + public void testValidateAndUpdateCache() throws Exception { + + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + + + OMRequest omRequest = createSeBucketPropertyRequest(volumeName, + bucketName, true); + + // Create with default BucketInfo values + TestOMRequestUtils.addVolumeAndBucketCreateEntriesToDB(volumeName, + bucketName, omMetadataManager); + + OMBucketSetPropertyRequest omBucketSetPropertyRequest = + new OMBucketSetPropertyRequest(omRequest); + + OMClientResponse omClientResponse = + omBucketSetPropertyRequest.validateAndUpdateCache(ozoneManager, 1); + + Assert.assertEquals(true, + omMetadataManager.getBucketTable().get( + omMetadataManager.getBucketKey(volumeName, bucketName)) + .getIsVersionEnabled()); + + Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK, + omClientResponse.getOMResponse().getStatus()); + + } + + @Test + public void testValidateAndUpdateCacheFails() throws Exception { + + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + + + OMRequest omRequest = createSeBucketPropertyRequest(volumeName, + bucketName, true); + + + OMBucketSetPropertyRequest omBucketSetPropertyRequest = + new OMBucketSetPropertyRequest(omRequest); + + OMClientResponse omClientResponse = + omBucketSetPropertyRequest.validateAndUpdateCache(ozoneManager, 1); + + Assert.assertEquals(OzoneManagerProtocolProtos.Status.BUCKET_NOT_FOUND, + omClientResponse.getOMResponse().getStatus()); + + Assert.assertNull(omMetadataManager.getBucketTable().get( + omMetadataManager.getBucketKey(volumeName, bucketName))); + + } + + private OMRequest createSeBucketPropertyRequest(String volumeName, + String bucketName, boolean isVersionEnabled) { + return OMRequest.newBuilder().setSetBucketPropertyRequest( + SetBucketPropertyRequest.newBuilder().setBucketArgs( + BucketArgs.newBuilder().setBucketName(bucketName) + .setVolumeName(volumeName) + .setIsVersionEnabled(isVersionEnabled).build())) + .setCmdType(OzoneManagerProtocolProtos.Type.SetBucketProperty) + .setClientId(UUID.randomUUID().toString()).build(); + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java new file mode 100644 index 0000000000000..0d5509c5087ae --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.ozone.om.request; + +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; +import org.apache.hadoop.util.Time; + +import java.util.UUID; + +/** + * Helper class to test OMRequests. + */ +public final class TestOMRequestUtils { + + private TestOMRequestUtils() { + //Do nothing + } + + /** + * Add Bucket and volume create entries in OM DB. + * @param volumeName + * @param bucketName + * @param omMetadataManager + * @throws Exception + */ + public static void addVolumeAndBucketCreateEntriesToDB(String volumeName, + String bucketName, OMMetadataManager omMetadataManager) + throws Exception { + + createVolumeEntryToDDB(volumeName, bucketName, omMetadataManager); + + OmBucketInfo omBucketInfo = + OmBucketInfo.newBuilder().setVolumeName(volumeName) + .setBucketName(bucketName).setCreationTime(Time.now()).build(); + + omMetadataManager.getBucketTable().put( + omMetadataManager.getBucketKey(volumeName, bucketName), omBucketInfo); + } + + /** + * Add Create Volume entry in OM DB. + * @param volumeName + * @param bucketName + * @param omMetadataManager + * @throws Exception + */ + public static void createVolumeEntryToDDB(String volumeName, + String bucketName, OMMetadataManager omMetadataManager) + throws Exception { + OmVolumeArgs omVolumeArgs = + OmVolumeArgs.newBuilder().setCreationTime(Time.now()) + .setVolume(volumeName).setAdminName(UUID.randomUUID().toString()) + .setOwnerName(UUID.randomUUID().toString()).build(); + omMetadataManager.getVolumeTable().put( + omMetadataManager.getVolumeKey(volumeName), omVolumeArgs); + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/hadoop-ozone/ozone-manager/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 0000000000000..3c9e1c8a6971a --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +mock-maker-inline \ No newline at end of file