Skip to content

Commit 9d61f49

Browse files
CC-33809: Merge pull request #844 from confluentinc/fp-support
Update File rotation based on a limit when FieldPartitioner is selected.
2 parents adfea87 + 01e5d7e commit 9d61f49

File tree

5 files changed

+179
-6
lines changed

5 files changed

+179
-6
lines changed

kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,9 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig {
250250
+ "to prefix or suffix in the s3 path after the topic name."
251251
+ " None will not append the schema name in the s3 path.";
252252

253+
public static final String PARTITIONER_MAX_OPEN_FILES_CONFIG = "partitioner.max.open.files";
254+
public static final int PARTITIONER_MAX_OPEN_FILES_DEFAULT = -1;
255+
253256
private static final GenericRecommender SCHEMA_PARTITION_AFFIX_TYPE_RECOMMENDER =
254257
new GenericRecommender();
255258

@@ -913,6 +916,12 @@ public static ConfigDef newConfigDef() {
913916
"Elastic buffer initial capacity"
914917
);
915918

919+
configDef.defineInternal(
920+
PARTITIONER_MAX_OPEN_FILES_CONFIG,
921+
Type.INT,
922+
PARTITIONER_MAX_OPEN_FILES_DEFAULT,
923+
Importance.LOW
924+
);
916925
}
917926
return configDef;
918927
}

kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ public class TopicPartitionWriter {
8888
private final boolean ignoreTaggingErrors;
8989
private int recordCount;
9090
private final int flushSize;
91+
private final int partitionerMaxOpenFiles;
9192
private final long rotateIntervalMs;
9293
private final long rotateScheduleIntervalMs;
9394
private long nextScheduledRotation;
@@ -169,6 +170,8 @@ public TopicPartitionWriter(TopicPartition tp,
169170
S3SinkConnectorConfig.S3_OBJECT_BEHAVIOR_ON_TAGGING_ERROR_CONFIG)
170171
.equalsIgnoreCase(S3SinkConnectorConfig.IgnoreOrFailBehavior.IGNORE.toString());
171172
flushSize = connectorConfig.getInt(S3SinkConnectorConfig.FLUSH_SIZE_CONFIG);
173+
partitionerMaxOpenFiles = connectorConfig.getInt(
174+
S3SinkConnectorConfig.PARTITIONER_MAX_OPEN_FILES_CONFIG);
172175
topicsDir = connectorConfig.getString(StorageCommonConfig.TOPICS_DIR_CONFIG);
173176
rotateIntervalMs = connectorConfig.getLong(S3SinkConnectorConfig.ROTATE_INTERVAL_MS_CONFIG);
174177
if (rotateIntervalMs > 0 && timestampExtractor == null) {
@@ -420,6 +423,17 @@ private boolean checkRotationOrAppend(
420423
return true;
421424
}
422425

426+
if (rotateOnPartitionerMaxOpenFiles(encodedPartition)) {
427+
fileRotationTracker.incrementRotationByPartitionerMaxOpenFilesCount(encodedPartition);
428+
log.info(
429+
"Starting commit and rotation for topic partition {} with start offset {}",
430+
tp,
431+
startOffsets
432+
);
433+
nextState();
434+
return true;
435+
}
436+
423437
SinkRecord projectedRecord = compatibility.project(record, null, currentValueSchema);
424438
boolean validRecord = writeRecord(projectedRecord, encodedPartition, record);
425439
buffer.poll();
@@ -442,6 +456,19 @@ private boolean checkRotationOrAppend(
442456
return false;
443457
}
444458

459+
private boolean rotateOnPartitionerMaxOpenFiles(String encodedPartition) {
460+
if (partitionerMaxOpenFiles == -1) {
461+
return false;
462+
}
463+
464+
boolean rotate = !commitFiles.containsKey(encodedPartition)
465+
&& commitFiles.size() == partitionerMaxOpenFiles;
466+
log.trace("Should apply rotation on max open files for topic-partition '{}': "
467+
+ "(partitionerMaxOpenFiles: '{}', commitFiles.size(): '{}')? {}",
468+
tp, partitionerMaxOpenFiles, commitFiles.size(), rotate);
469+
return rotate;
470+
}
471+
445472
private void commitOnTimeIfNoData(long now) {
446473
if (buffer.isEmpty()) {
447474
// committing files after waiting for rotateIntervalMs time but less than flush.size

kafka-connect-s3/src/main/java/io/confluent/connect/s3/util/FileRotationTracker.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ private static final class RotationMetrics {
2727

2828
int rotationByFlushSize = 0;
2929

30+
int rotationByPartitionerMaxFiles = 0;
31+
3032
int rotationByRotationInterval = 0;
3133

3234
int rotationByScheduledRotationInterval = 0;
@@ -69,6 +71,10 @@ public void incrementRotationByFlushSizeCount() {
6971
rotationByFlushSize++;
7072
}
7173

74+
public void incrementRotationByPartitionerMaxOpenFilesCount() {
75+
rotationByPartitionerMaxFiles++;
76+
}
77+
7278
public void incrementRotationByRotationIntervalCount() {
7379
rotationByRotationInterval++;
7480
}
@@ -97,6 +103,13 @@ public void incrementRotationByFlushSizeCount(String outputPartition) {
97103
metrics.get(outputPartition).incrementRotationByFlushSizeCount();
98104
}
99105

106+
public void incrementRotationByPartitionerMaxOpenFilesCount(String outputPartition) {
107+
if (!metrics.containsKey(outputPartition)) {
108+
metrics.put(outputPartition, new RotationMetrics());
109+
}
110+
metrics.get(outputPartition).incrementRotationByPartitionerMaxOpenFilesCount();
111+
}
112+
100113
public void incrementRotationByRotationIntervalCount(String outputPartition) {
101114
if (!metrics.containsKey(outputPartition)) {
102115
metrics.put(outputPartition, new RotationMetrics());
@@ -135,6 +148,8 @@ public String toString() {
135148
sb.append(rotationMetrics.rotationByScheduledRotationInterval);
136149
sb.append(", RotationByFlushSize: ");
137150
sb.append(rotationMetrics.rotationByFlushSize);
151+
sb.append(", RotationByPartitionerMaxFiles: ");
152+
sb.append(rotationMetrics.rotationByPartitionerMaxFiles);
138153
sb.append(", RotationByDiffName: ");
139154
sb.append(rotationMetrics.rotationByDiffName);
140155
sb.append(", RotationByDiffSchema: ");

kafka-connect-s3/src/test/java/io/confluent/connect/s3/TopicPartitionWriterTest.java

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
import org.slf4j.Logger;
100100
import org.slf4j.LoggerFactory;
101101

102+
import static io.confluent.connect.s3.S3SinkConnectorConfig.PARTITIONER_MAX_OPEN_FILES_CONFIG;
102103
import static io.confluent.connect.s3.S3SinkConnectorConfig.SCHEMA_PARTITION_AFFIX_TYPE_CONFIG;
103104
import static io.confluent.connect.s3.util.Utils.sinkRecordToLoggableString;
104105
import static io.confluent.connect.storage.StorageSinkConnectorConfig.FLUSH_SIZE_CONFIG;
@@ -658,6 +659,125 @@ public void testWriteRecordTimeBasedPartitionRecordTimestampHoursOutOfOrder() th
658659
verify(expectedFiles, 2, schema, records);
659660
}
660661

662+
@Test
663+
public void testWriteRecordFieldBasedPartitionAndRotateOnMaxSize() throws Exception {
664+
localProps.put(PARTITIONER_MAX_OPEN_FILES_CONFIG, "5");
665+
localProps.put(FLUSH_SIZE_CONFIG, "10");
666+
setUp();
667+
668+
// Define the partitioner
669+
Partitioner<?> partitioner = new FieldPartitioner<>();
670+
partitioner.configure(parsedConfig);
671+
672+
TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(
673+
TOPIC_PARTITION, storage, writerProvider, partitioner, connectorConfig, context, null);
674+
675+
String key = "key";
676+
Schema schema = createSchema();
677+
List<Struct> records = createRecordBatches(schema, 6, 1);
678+
679+
Collection<SinkRecord> sinkRecords = createSinkRecords(records, key, schema);
680+
681+
for (SinkRecord record : sinkRecords) {
682+
topicPartitionWriter.buffer(record);
683+
}
684+
685+
// Test actual write
686+
topicPartitionWriter.write();
687+
topicPartitionWriter.close();
688+
689+
@SuppressWarnings("unchecked")
690+
List<String> partitionFields = (List<String>) parsedConfig.get(PARTITION_FIELD_NAME_CONFIG);
691+
String partitionField = partitionFields.get(0);
692+
693+
List<String> expectedFiles = new ArrayList<>();
694+
for (int i = 0; i < 5; i++) {
695+
String dirPrefix = partitioner.generatePartitionedPath(TOPIC, partitionField + "=" + String.valueOf(16 + i));
696+
expectedFiles.add(FileUtils.fileKeyToCommit(topicsDir, dirPrefix, TOPIC_PARTITION, i, extension, ZERO_PAD_FMT));
697+
}
698+
verify(expectedFiles, 1, schema, records);
699+
}
700+
701+
@Test
702+
public void testWriteRecordFieldBasedPartitionAndRotateOnMaxSizeWithMultipleFields() throws Exception {
703+
localProps.put(PARTITIONER_MAX_OPEN_FILES_CONFIG, "5");
704+
localProps.put(FLUSH_SIZE_CONFIG, "10");
705+
setUp();
706+
707+
// Define the partitioner
708+
Partitioner<?> partitioner = new FieldPartitioner<>();
709+
parsedConfig.put(PARTITION_FIELD_NAME_CONFIG, Arrays.asList("int", "boolean"));
710+
partitioner.configure(parsedConfig);
711+
712+
TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(
713+
TOPIC_PARTITION, storage, writerProvider, partitioner, connectorConfig, context, null);
714+
715+
String key = "key";
716+
Schema schema = createSchema();
717+
List<Struct> records = createRecordBatches(schema, 6, 1);
718+
719+
Collection<SinkRecord> sinkRecords = createSinkRecords(records, key, schema);
720+
721+
for (SinkRecord record : sinkRecords) {
722+
topicPartitionWriter.buffer(record);
723+
}
724+
725+
// Test actual write
726+
topicPartitionWriter.write();
727+
topicPartitionWriter.close();
728+
729+
@SuppressWarnings("unchecked")
730+
List<String> partitionFields = (List<String>) parsedConfig.get(PARTITION_FIELD_NAME_CONFIG);
731+
String partitionField = partitionFields.get(0);
732+
733+
List<String> expectedFiles = new ArrayList<>();
734+
for (int i = 0; i < 5; i++) {
735+
String dirPrefix = partitioner.generatePartitionedPath(TOPIC, partitionField + "=" + String.valueOf(16 + i));
736+
dirPrefix = partitioner.generatePartitionedPath(dirPrefix, "boolean=true");
737+
expectedFiles.add(FileUtils.fileKeyToCommit(topicsDir, dirPrefix, TOPIC_PARTITION, i, extension, ZERO_PAD_FMT));
738+
}
739+
verify(expectedFiles, 1, schema, records);
740+
}
741+
742+
@Test
743+
public void testWriteRecordFieldBasedPartitionAndRotateOnFlushSizeWhenMaxFilesIsUnset() throws Exception {
744+
localProps.put(PARTITIONER_MAX_OPEN_FILES_CONFIG, "-1");
745+
localProps.put(FLUSH_SIZE_CONFIG, "10");
746+
setUp();
747+
748+
// Define the partitioner
749+
Partitioner<?> partitioner = new FieldPartitioner<>();
750+
partitioner.configure(parsedConfig);
751+
752+
TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(
753+
TOPIC_PARTITION, storage, writerProvider, partitioner, connectorConfig, context, null);
754+
755+
String key = "key";
756+
Schema schema = createSchema();
757+
List<Struct> records = createRecordBatches(schema, 1, 20);
758+
759+
Collection<SinkRecord> sinkRecords = createSinkRecords(records, key, schema);
760+
761+
for (SinkRecord record : sinkRecords) {
762+
topicPartitionWriter.buffer(record);
763+
}
764+
765+
// Test actual write
766+
topicPartitionWriter.write();
767+
topicPartitionWriter.close();
768+
769+
@SuppressWarnings("unchecked")
770+
List<String> partitionFields = (List<String>) parsedConfig.get(PARTITION_FIELD_NAME_CONFIG);
771+
String partitionField = partitionFields.get(0);
772+
773+
List<String> expectedFiles = new ArrayList<>();
774+
String dirPrefix = partitioner.generatePartitionedPath(TOPIC, partitionField + "=" + String.valueOf(16));
775+
expectedFiles.add(FileUtils.fileKeyToCommit(topicsDir, dirPrefix, TOPIC_PARTITION, 0, extension, ZERO_PAD_FMT));
776+
expectedFiles.add(FileUtils.fileKeyToCommit(topicsDir, dirPrefix, TOPIC_PARTITION, 10, extension, ZERO_PAD_FMT));
777+
778+
verify(expectedFiles, 10, schema, records);
779+
}
780+
661781
@Test
662782
public void testWriteRecordTimeBasedPartitionRecordTimestampHoursOutOfOrderAndRotateOnPartitionChange() throws Exception {
663783
localProps.put(FLUSH_SIZE_CONFIG, "1000");

kafka-connect-s3/src/test/java/io/confluent/connect/s3/util/FileRotationTrackerTest.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@ public void testRotationIncrements() {
1717
FileRotationTracker fileRotationTracker = new FileRotationTracker();
1818
fileRotationTracker.incrementRotationByRotationIntervalCount(FILE_1);
1919
fileRotationTracker.incrementRotationByFlushSizeCount(FILE_2);
20+
fileRotationTracker.incrementRotationByPartitionerMaxOpenFilesCount(FILE_2);
2021
fileRotationTracker
2122
.incrementRotationBySchemaChangeCount(FILE_3, SchemaIncompatibilityType.DIFFERENT_VERSION);
2223
fileRotationTracker.incrementRotationByFlushSizeCount(FILE_1);
2324
String actual = fileRotationTracker.toString();
24-
String expected = "OutputPartition: file2, RotationByInterval: 0, RotationByScheduledInterval: 0, RotationByFlushSize: 1, RotationByDiffName: 0, RotationByDiffSchema: 0, RotationByDiffType: 0, RotationByDiffVersion: 0, RotationByDiffParams: 0, RotationByNullSchema: 0\n"
25-
+ "OutputPartition: file3, RotationByInterval: 0, RotationByScheduledInterval: 0, RotationByFlushSize: 0, RotationByDiffName: 0, RotationByDiffSchema: 0, RotationByDiffType: 0, RotationByDiffVersion: 1, RotationByDiffParams: 0, RotationByNullSchema: 0\n"
26-
+ "OutputPartition: file1, RotationByInterval: 1, RotationByScheduledInterval: 0, RotationByFlushSize: 1, RotationByDiffName: 0, RotationByDiffSchema: 0, RotationByDiffType: 0, RotationByDiffVersion: 0, RotationByDiffParams: 0, RotationByNullSchema: 0\n";
25+
String expected = "OutputPartition: file2, RotationByInterval: 0, RotationByScheduledInterval: 0, RotationByFlushSize: 1, RotationByPartitionerMaxFiles: 1, RotationByDiffName: 0, RotationByDiffSchema: 0, RotationByDiffType: 0, RotationByDiffVersion: 0, RotationByDiffParams: 0, RotationByNullSchema: 0\n"
26+
+ "OutputPartition: file3, RotationByInterval: 0, RotationByScheduledInterval: 0, RotationByFlushSize: 0, RotationByPartitionerMaxFiles: 0, RotationByDiffName: 0, RotationByDiffSchema: 0, RotationByDiffType: 0, RotationByDiffVersion: 1, RotationByDiffParams: 0, RotationByNullSchema: 0\n"
27+
+ "OutputPartition: file1, RotationByInterval: 1, RotationByScheduledInterval: 0, RotationByFlushSize: 1, RotationByPartitionerMaxFiles: 0, RotationByDiffName: 0, RotationByDiffSchema: 0, RotationByDiffType: 0, RotationByDiffVersion: 0, RotationByDiffParams: 0, RotationByNullSchema: 0\n";
2728
Assert.assertEquals(expected, actual);
2829
}
2930

@@ -32,13 +33,14 @@ public void testRotationCountClearance() {
3233
FileRotationTracker fileRotationTracker = new FileRotationTracker();
3334
fileRotationTracker.incrementRotationByRotationIntervalCount(FILE_1);
3435
fileRotationTracker.incrementRotationByFlushSizeCount(FILE_2);
36+
fileRotationTracker.incrementRotationByPartitionerMaxOpenFilesCount(FILE_3);
3537
fileRotationTracker
3638
.incrementRotationBySchemaChangeCount(FILE_3, SchemaIncompatibilityType.DIFFERENT_VERSION);
3739
fileRotationTracker.incrementRotationByFlushSizeCount(FILE_1);
3840
String actual = fileRotationTracker.toString();
39-
String expected = "OutputPartition: file2, RotationByInterval: 0, RotationByScheduledInterval: 0, RotationByFlushSize: 1, RotationByDiffName: 0, RotationByDiffSchema: 0, RotationByDiffType: 0, RotationByDiffVersion: 0, RotationByDiffParams: 0, RotationByNullSchema: 0\n"
40-
+ "OutputPartition: file3, RotationByInterval: 0, RotationByScheduledInterval: 0, RotationByFlushSize: 0, RotationByDiffName: 0, RotationByDiffSchema: 0, RotationByDiffType: 0, RotationByDiffVersion: 1, RotationByDiffParams: 0, RotationByNullSchema: 0\n"
41-
+ "OutputPartition: file1, RotationByInterval: 1, RotationByScheduledInterval: 0, RotationByFlushSize: 1, RotationByDiffName: 0, RotationByDiffSchema: 0, RotationByDiffType: 0, RotationByDiffVersion: 0, RotationByDiffParams: 0, RotationByNullSchema: 0\n";
41+
String expected = "OutputPartition: file2, RotationByInterval: 0, RotationByScheduledInterval: 0, RotationByFlushSize: 1, RotationByPartitionerMaxFiles: 0, RotationByDiffName: 0, RotationByDiffSchema: 0, RotationByDiffType: 0, RotationByDiffVersion: 0, RotationByDiffParams: 0, RotationByNullSchema: 0\n"
42+
+ "OutputPartition: file3, RotationByInterval: 0, RotationByScheduledInterval: 0, RotationByFlushSize: 0, RotationByPartitionerMaxFiles: 1, RotationByDiffName: 0, RotationByDiffSchema: 0, RotationByDiffType: 0, RotationByDiffVersion: 1, RotationByDiffParams: 0, RotationByNullSchema: 0\n"
43+
+ "OutputPartition: file1, RotationByInterval: 1, RotationByScheduledInterval: 0, RotationByFlushSize: 1, RotationByPartitionerMaxFiles: 0, RotationByDiffName: 0, RotationByDiffSchema: 0, RotationByDiffType: 0, RotationByDiffVersion: 0, RotationByDiffParams: 0, RotationByNullSchema: 0\n";
4244
Assert.assertEquals(expected, actual);
4345
fileRotationTracker.clear();
4446
actual = fileRotationTracker.toString();

0 commit comments

Comments
 (0)