Skip to content

Commit 57f719c

Browse files
committed
fix parquet batching
1 parent 7adc757 commit 57f719c

File tree

10 files changed

+374
-86
lines changed

10 files changed

+374
-86
lines changed

src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AbstractFileReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,9 @@ protected void setOffset(long offset) {
7676
public final boolean hasNext() {
7777
checkClosed();
7878
try {
79-
return (batchSize <= 0 || offset == 0 || offset % batchSize != 0 || (offset % batchSize == 0 && seeked)) &&
79+
boolean hasNext = (batchSize <= 0 || offset == 0 || offset % batchSize != 0 || (offset % batchSize == 0 && seeked)) &&
8080
hasNextRecord();
81+
return hasNext;
8182
} catch (ConnectException ce) {
8283
throw ce;
8384
} catch (Exception e) {

src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/ParquetFileReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ public void seekFile(long offset) throws IOException {
9090
this.closed = false;
9191
setOffset(0);
9292
}
93-
while (hasNext() && currentOffset() < offset) {
93+
94+
while (hasNextRecord() && currentOffset() < offset) {
9495
nextRecord();
9596
}
9697
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.github.mmolimar.kafka.connect.fs;
2+
3+
import org.apache.kafka.connect.source.SourceTaskContext;
4+
import org.apache.kafka.connect.storage.OffsetStorageReader;
5+
6+
import java.util.Map;
7+
8+
public class InMemoryFsContext implements SourceTaskContext {
9+
10+
public OffsetStorageReader reader;
11+
public FsSourceTaskConfig configs;
12+
13+
public InMemoryFsContext() {
14+
reader = new InMemoryFsOffsetStorageReader();
15+
}
16+
17+
@Override
18+
public Map<String, String> configs() {
19+
System.out.println("TURNS OUT YOU PROBABLY NEED TO IMPLEMENT THIS.");
20+
return null;
21+
}
22+
23+
@Override
24+
public OffsetStorageReader offsetStorageReader() {
25+
return reader;
26+
}
27+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.github.mmolimar.kafka.connect.fs;
2+
3+
import org.apache.kafka.connect.storage.OffsetStorageReader;
4+
5+
import java.util.Collection;
6+
import java.util.HashMap;
7+
import java.util.Map;
8+
9+
public class InMemoryFsOffsetStorageReader implements OffsetStorageReader {
10+
@Override
11+
public <T> Map<String, Object> offset(Map<String, T> partition) {
12+
return InMemoryOffsetStore.Offsets.get(partition);
13+
}
14+
15+
@Override
16+
public <T> Map<Map<String, T>, Map<String, Object>> offsets(Collection<Map<String, T>> partitions) {
17+
Map<Map<String, T>, Map<String, Object>> offsets = new HashMap<>();
18+
for (Map.Entry<Map<String, Object>, Map<String, Object>> entry : InMemoryOffsetStore.Offsets.entrySet()) {
19+
if (partitions.contains(entry.getKey())) {
20+
21+
offsets.put((Map<String, T>) entry.getKey(), entry.getValue());
22+
}
23+
}
24+
return offsets;
25+
}
26+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.github.mmolimar.kafka.connect.fs;
2+
3+
import org.apache.kafka.connect.source.SourceRecord;
4+
5+
import java.util.ArrayList;
6+
import java.util.List;
7+
import java.util.Map;
8+
9+
public class InMemoryFsSourceTask extends FsSourceTask {
10+
11+
private List<SourceRecord> batchRecords = new ArrayList<>();
12+
13+
@Override
14+
public void commit() throws InterruptedException {
15+
if (batchRecords == null) return;
16+
batchRecords.forEach(sourceRecord -> InMemoryOffsetStore.Offsets.put(
17+
(Map<String, Object>) sourceRecord.sourcePartition() ,
18+
(Map<String, Object>) sourceRecord.sourceOffset()
19+
));
20+
}
21+
22+
@Override
23+
public List<SourceRecord> poll() {
24+
List<SourceRecord> records = super.poll();
25+
batchRecords = records;
26+
try {
27+
this.commit();
28+
} catch (InterruptedException exception) {}
29+
30+
return records;
31+
}
32+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.github.mmolimar.kafka.connect.fs;
2+
3+
import java.util.HashMap;
4+
import java.util.Map;
5+
6+
public class InMemoryOffsetStore {
7+
public static Map<Map<String, Object>, Map<String, Object>> Offsets = new HashMap<>();
8+
}

src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/ParquetFileReaderTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.github.mmolimar.kafka.connect.fs.file.reader;
22

3+
import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig;
34
import org.apache.avro.AvroRuntimeException;
45
import org.apache.avro.Schema;
56
import org.apache.avro.SchemaBuilder;
@@ -26,6 +27,7 @@
2627
import java.io.IOException;
2728
import java.util.HashMap;
2829
import java.util.Map;
30+
import java.util.NoSuchElementException;
2931
import java.util.UUID;
3032
import java.util.stream.IntStream;
3133

@@ -180,6 +182,36 @@ public void readerWithUnparseableSchema(ReaderFsTestConfig fsConfig) {
180182
});
181183
}
182184

185+
@ParameterizedTest
186+
@MethodSource("fileSystemConfigProvider")
187+
public void readAllDataInBatches(ReaderFsTestConfig fsConfig) {
188+
Map<String, Object> readerConfig = getReaderConfig();
189+
int batchSize = 10;
190+
readerConfig.put(ParquetFileReader.FILE_READER_PARQUET_SCHEMA, readerSchema.toString());
191+
readerConfig.put(AgnosticFileReader.FILE_READER_AGNOSTIC_EXTENSIONS_PARQUET, getFileExtension());
192+
readerConfig.put(FsSourceTaskConfig.FILE_READER_BATCH_SIZE, batchSize);
193+
194+
AbstractFileReader<?> reader = (AbstractFileReader<?>) getReader(fsConfig.getFs(), fsConfig.getDataFile(), readerConfig);
195+
assertTrue(reader.hasNext());
196+
197+
int recordCount = 0;
198+
int batchCount = 0;
199+
200+
while (reader.hasNextBatch()) {
201+
reader.nextBatch();
202+
while (reader.hasNext()) {
203+
Struct record = reader.next();
204+
checkData(record, recordCount);
205+
recordCount++;
206+
}
207+
batchCount++;
208+
}
209+
210+
assertThrows(NoSuchElementException.class, reader::nextBatch);
211+
assertEquals(NUM_RECORDS / batchSize, batchCount, "The number of batches processed does not match");
212+
assertEquals(NUM_RECORDS, recordCount, "The number of records in the file does not match");
213+
}
214+
183215
@Override
184216
protected Map<String, Object> getReaderConfig() {
185217
return new HashMap<>();

src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/ReaderFsTestConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import java.util.HashMap;
99
import java.util.Map;
1010

11-
interface ReaderFsTestConfig extends FsTestConfig {
11+
public interface ReaderFsTestConfig extends FsTestConfig {
1212

1313
void setDataFile(Path dataFile);
1414

0 commit comments

Comments
 (0)