Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1316,19 +1316,52 @@ private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily

switch (compactType) {
case MOB:
addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
addListener(getDescriptor(tableName), (tableDesc, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
future.complete(ret);
}
});
List<ColumnFamilyDescriptor> mobColumnFamilies =
Arrays.stream(tableDesc.getColumnFamilies())
.filter(ColumnFamilyDescriptor::isMobEnabled).toList();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think toList is available in java 8?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply! a little busy recently.
Yes, my fault. Thanks for spotting that; I'll update it later.

if (mobColumnFamilies.isEmpty()) {
// It's not a mob table, nothing to do.
future.complete(null);
return;
}
if (columnFamily == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems here we added bunch of code related to MOB feature? What is the implementation on branch-2 for sync HBaseAdmin?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Branch-2 has the same code logic as here, I also verified this on HBase 2.6.2, and the same error occurs.

hbase:013:0>  compact 't1', 'info', 'MOB'

ERROR: org.apache.hadoop.hbase.NotServingRegionException: t1,.mob,0.78e317a6e78a0fceb27b9fa0cb9dcf5b. is not online on hbase001,16000,1762860032382
        at org.apache.hadoop.hbase.regionserver.HRegionServer.getRegionByEncodedName(HRegionServer.java:3675)
        at org.apache.hadoop.hbase.regionserver.HRegionServer.getRegion(HRegionServer.java:3653)
        at org.apache.hadoop.hbase.regionserver.RSRpcServices.getRegion(RSRpcServices.java:1497)
        at org.apache.hadoop.hbase.regionserver.RSRpcServices.compactRegion(RSRpcServices.java:1703)
        at org.apache.hadoop.hbase.master.MasterRpcServices.compactRegion(MasterRpcServices.java:1875)
        at org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos$AdminService$2.callBlockingMethod(AdminProtos.java:34087)
        at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:443)
        at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:124)
        at org.apache.hadoop.hbase.ipc.RpcHandler.run(RpcHandler.java:105)
        at org.apache.hadoop.hbase.ipc.RpcHandler.run(RpcHandler.java:85)

For usage try 'help "compact"'

Took 0.0270 seconds
hbase:014:0> version
2.6.2, r6b3b36b429cf9a9d74110de79eb3b327b29ebf17, Fri Feb 14 14:39:16 UTC 2025
Took 0.0000 seconds

CompletableFuture<?>[] completableFutures = mobColumnFamilies.stream()
.map(cfd -> compact(tableName, cfd.getName(), major, CompactType.NORMAL))
.toArray(CompletableFuture<?>[]::new);
addListener(CompletableFuture.allOf(completableFutures), (ret1, err1) -> {
if (err1 != null) {
future.completeExceptionally(err1);
} else {
future.complete(ret1);
}
});
return;
}
ColumnFamilyDescriptor cfd = tableDesc.getColumnFamily(columnFamily);
if (cfd == null) {
future.completeExceptionally(
new NoSuchColumnFamilyException("Column family " + Bytes.toString(columnFamily)
+ " does not exist in table " + tableName.getNameAsString()));
return;
}
if (cfd.isMobEnabled()) {
addListener(compact(tableName, cfd.getName(), major, CompactType.NORMAL),
(ret2, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
future.complete(ret2);
}
});
} else {
// The specified column family is not a mob column family, nothing to do.
future.complete(null);
}
});
break;
case NORMAL:
Expand All @@ -1339,6 +1372,7 @@ private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily
}
if (locations == null || locations.isEmpty()) {
future.completeExceptionally(new TableNotFoundException(tableName));
return;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In here, return early to prevent further execution when locations is null

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a separated bug? We can fix this first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'll open a new PR for this issue, thanks!

}
CompletableFuture<?>[] compactFutures =
locations.stream().filter(l -> l.getRegion() != null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

@Category({ ClientTests.class, MediumTests.class })
public class TestMobCompactFromClient {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMobCompactFromClient.class);

@Rule
public TestName name = new TestName();

private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static final String FAMILY = "info";
private static final String MOB_FAMILY = "mob_info";
private static final String QUALIFIER = "q";

private static Admin admin;

@Before
public void setup() throws Exception {
TEST_UTIL.startMiniCluster(1);
admin = TEST_UTIL.getAdmin();
}

@After
public void tearDown() throws IOException {
admin.close();
TEST_UTIL.shutdownMiniCluster();
TEST_UTIL.getTestFileSystem().delete(TEST_UTIL.getDataTestDir(), true);
}

@Test
public void testCompactMobTableFromClientSize() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName());
TableDescriptorBuilder tableBuilder = TableDescriptorBuilder.newBuilder(tableName);
tableBuilder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(MOB_FAMILY))
.setMobEnabled(true).setMobThreshold(100L).build());
tableBuilder
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(FAMILY)).build());
admin.createTable(tableBuilder.build());

assertTrue(admin.tableExists(tableName));

try (Table table = admin.getConnection().getTable(tableName)) {
// Put some data && flush the table
for (int i = 0; i < 5; i++) {
Put put = new Put(Bytes.toBytes("row" + i));
put.addColumn(Bytes.toBytes(MOB_FAMILY), Bytes.toBytes(QUALIFIER), makeDummyData(500));
put.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(QUALIFIER), makeDummyData(10));
table.put(put);
admin.flush(tableName);
}

List<RegionInfo> regionInfos = admin.getRegions(tableName);
assertEquals(1, regionInfos.size());
RegionInfo regionInfo = regionInfos.get(0);
HRegion region =
TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionInfo.getEncodedName());
HStore store1 = region.getStore(Bytes.toBytes(MOB_FAMILY));
assertNotNull(store1);
HStore store2 = region.getStore(Bytes.toBytes(FAMILY));
assertNotNull(store2);

assertEquals(5, store1.getStorefilesCount());
assertEquals(5, store2.getStorefilesCount());

admin.compact(tableName, Bytes.toBytes(MOB_FAMILY), CompactType.MOB);
Thread.sleep(1000);
int retry = 5;
while (
admin.getCompactionState(tableName, CompactType.MOB) != CompactionState.NONE && retry > 0
) {
Thread.sleep(1000);
retry--;
}
assertEquals(CompactionState.NONE, admin.getCompactionState(tableName, CompactType.MOB));

int store1fileCount = store1.getStorefilesCount();
int store2fileCount = store2.getStorefilesCount();
assertTrue(store1fileCount < 5);
assertEquals(5, store2fileCount);

// Put some data && flush the table
for (int i = 5; i < 10; i++) {
Put put = new Put(Bytes.toBytes("row" + i));
put.addColumn(Bytes.toBytes(MOB_FAMILY), Bytes.toBytes(QUALIFIER), makeDummyData(500));
put.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(QUALIFIER), makeDummyData(10));
table.put(put);
admin.flush(tableName);
}
assertEquals(store1fileCount + 5, store1.getStorefilesCount());
assertEquals(store2fileCount + 5, store2.getStorefilesCount());

admin.compact(tableName, CompactType.MOB);
Thread.sleep(1000);
retry = 5;
while (
admin.getCompactionState(tableName, CompactType.MOB) != CompactionState.NONE && retry > 0
) {
Thread.sleep(1000);
retry--;
}
assertEquals(CompactionState.NONE, admin.getCompactionState(tableName, CompactType.MOB));

assertTrue(store1.getStorefilesCount() < (store1fileCount + 5));
assertEquals(store2fileCount + 5, store2.getStorefilesCount());
} finally {
TEST_UTIL.deleteTable(tableName);
}
}

@Test
public void testMajorCompactMobTableFromClientSize() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName());
TableDescriptorBuilder tableBuilder = TableDescriptorBuilder.newBuilder(tableName);
tableBuilder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(MOB_FAMILY))
.setMobEnabled(true).setMobThreshold(100L).build());
tableBuilder
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(FAMILY)).build());
admin.createTable(tableBuilder.build());

assertTrue(admin.tableExists(tableName));

try (Table table = admin.getConnection().getTable(tableName)) {
// Put some data && flush the table
for (int i = 0; i < 5; i++) {
Put put = new Put(Bytes.toBytes("row" + i));
put.addColumn(Bytes.toBytes(MOB_FAMILY), Bytes.toBytes(QUALIFIER), makeDummyData(500));
put.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(QUALIFIER), makeDummyData(10));
table.put(put);
admin.flush(tableName);
}

List<RegionInfo> regionInfos = admin.getRegions(tableName);
assertEquals(1, regionInfos.size());
RegionInfo regionInfo = regionInfos.get(0);
HRegion region =
TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionInfo.getEncodedName());
HStore store1 = region.getStore(Bytes.toBytes(MOB_FAMILY));
assertNotNull(store1);
HStore store2 = region.getStore(Bytes.toBytes(FAMILY));
assertNotNull(store2);
assertEquals(5, store1.getStorefilesCount());
assertEquals(5, store2.getStorefilesCount());

admin.majorCompact(tableName, CompactType.MOB);
Thread.sleep(1000);
int retry = 5;
while (
admin.getCompactionState(tableName, CompactType.MOB) != CompactionState.NONE && retry > 0
) {
Thread.sleep(1000);
retry--;
}
assertEquals(CompactionState.NONE, admin.getCompactionState(tableName, CompactType.MOB));

assertEquals(1, store1.getStorefilesCount());
assertEquals(5, store2.getStorefilesCount());

// Put some data && flush the table
for (int i = 5; i < 10; i++) {
Put put = new Put(Bytes.toBytes("row" + i));
put.addColumn(Bytes.toBytes(MOB_FAMILY), Bytes.toBytes(QUALIFIER), makeDummyData(500));
put.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(QUALIFIER), makeDummyData(10));
table.put(put);
admin.flush(tableName);
}
assertEquals(6, store1.getStorefilesCount());
assertEquals(10, store2.getStorefilesCount());

admin.compact(tableName, CompactType.MOB);
Thread.sleep(1000);
retry = 5;
while (
admin.getCompactionState(tableName, CompactType.MOB) != CompactionState.NONE && retry > 0
) {
Thread.sleep(1000);
retry--;
}
assertEquals(CompactionState.NONE, admin.getCompactionState(tableName, CompactType.MOB));

assertEquals(1, store1.getStorefilesCount());
assertEquals(10, store2.getStorefilesCount());
} finally {
TEST_UTIL.deleteTable(tableName);
}
}

@Test
public void testCompactMobTableWithNonFamilyFromClientSize() throws IOException {
TableName tableName = TableName.valueOf(name.getMethodName());
TableDescriptorBuilder tableBuilder = TableDescriptorBuilder.newBuilder(tableName);
tableBuilder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(MOB_FAMILY))
.setMobEnabled(true).setMobThreshold(100L).build());
TableDescriptor tableDescriptor = tableBuilder.build();
admin.createTable(tableDescriptor);

assertTrue(admin.tableExists(tableName));

try (Table table = admin.getConnection().getTable(tableName)) {
// Put some data && flush the table
for (int i = 0; i < 5; i++) {
Put put = new Put(Bytes.toBytes("row" + i));
put.addColumn(Bytes.toBytes(MOB_FAMILY), Bytes.toBytes(QUALIFIER), makeDummyData(500));
table.put(put);
admin.flush(tableName);
}
assertFalse(tableDescriptor.hasColumnFamily(Bytes.toBytes(FAMILY)));
assertThrows(NoSuchColumnFamilyException.class,
() -> admin.compact(tableName, Bytes.toBytes(FAMILY), CompactType.MOB));
} finally {
TEST_UTIL.deleteTable(tableName);
}
}

private byte[] makeDummyData(int size) {
byte[] dummyData = new byte[size];
Bytes.random(dummyData);
return dummyData;
}
}