Skip to content

Commit 50fe680

Browse files
committed
nested support
1 parent b0bf6bc commit 50fe680

File tree

14 files changed

+344
-98
lines changed

14 files changed

+344
-98
lines changed

java/vector/src/main/codegen/templates/ArrowType.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,14 @@ public byte getTypeType() {
7575

7676
@Override
7777
public int getType(FlatBufferBuilder builder) {
78+
<#list type.fields as field>
79+
<#if field.type == "String">
80+
int ${field.name} = builder.createString(this.${field.name});
81+
</#if>
82+
</#list>
7883
org.apache.arrow.flatbuf.${type.name}.start${type.name}(builder);
7984
<#list type.fields as field>
80-
org.apache.arrow.flatbuf.${type.name}.add${field.name?cap_first}(builder, <#if field.type == "String">builder.createString(${field.name})<#else>${field.name}</#if>);
85+
org.apache.arrow.flatbuf.${type.name}.add${field.name?cap_first}(builder, ${field.name});
8186
</#list>
8287
return org.apache.arrow.flatbuf.${type.name}.end${type.name}(builder);
8388
}

java/vector/src/main/codegen/templates/NullableValueVectors.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -126,20 +126,32 @@ public List<FieldVector> getChildrenFromFields() {
126126
}
127127

128128
public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) {
129-
if (ownBuffers.size() != 2) {
130-
throw new IllegalArgumentException("Illegal buffer count, expected 2, got: " + ownBuffers.size());
131-
}
132-
bits.data = ownBuffers.get(0);
133-
bits.data.retain(allocator);
134-
values.data = ownBuffers.get(1);
135-
values.data.retain(allocator);
129+
int expectedSize = <#if type.major = "VarLen">3<#else>2</#if>;
130+
if (ownBuffers.size() != expectedSize) {
131+
throw new IllegalArgumentException("Illegal buffer count, expected " + expectedSize + ", got: " + ownBuffers.size());
132+
}
133+
bits.load(ownBuffers.get(0));
134+
<#if type.major = "VarLen">
135+
values.offsetVector.load(ownBuffers.get(1));
136+
values.load(ownBuffers.get(2));
137+
<#else>
138+
values.load(ownBuffers.get(1));
139+
</#if>
136140
// TODO: do something with the sizes in fieldNode?
137141
}
138142

139143
public List<ArrowBuf> getFieldBuffers() {
140144
bits.getBuffer().readerIndex(0);
145+
<#if type.major = "VarLen">
146+
values.offsetVector.getBuffer().readerIndex(0);
147+
</#if>
141148
values.getBuffer().readerIndex(0);
142-
return Arrays.asList(bits.getBuffer(), values.getBuffer());
149+
return Arrays.asList(
150+
bits.getBuffer(),
151+
<#if type.major = "VarLen">
152+
values.offsetVector.getBuffer(),
153+
</#if>
154+
values.getBuffer());
143155
}
144156

145157
@Override

java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ public ArrowBuf getBuffer() {
8484
return data;
8585
}
8686

87+
public void load(ArrowBuf data) {
88+
this.data = data.retain(allocator);
89+
}
90+
8791
/**
8892
* This method has a similar effect of allocateNew() without actually clearing and reallocating
8993
* the value vector. The purpose is to move the value vector to a "mutate" state

java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,5 @@ public interface FieldVector extends ValueVector {
2525
* @return the
2626
*/
2727
List<ArrowBuf> getFieldBuffers();
28+
2829
}

java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import java.util.Iterator;
77
import java.util.List;
88

9-
import org.apache.arrow.vector.complex.MapVector;
109
import org.apache.arrow.vector.schema.ArrowFieldNode;
1110
import org.apache.arrow.vector.schema.ArrowRecordBatch;
1211
import org.apache.arrow.vector.schema.VectorLayout;
@@ -19,11 +18,11 @@ public class VectorLoader {
1918
private final List<FieldVector> fieldVectors;
2019
private final List<Field> fields;
2120

22-
public VectorLoader(Schema schema, MapVector root) {
21+
public VectorLoader(Schema schema, FieldVector root) {
2322
super();
2423
this.fields = schema.getFields();
25-
root.initializeChildren(fields);
26-
this.fieldVectors = root.getFieldVectors();
24+
root.initializeChildrenFromFields(fields);
25+
this.fieldVectors = root.getChildrenFromFields();
2726
if (this.fieldVectors.size() != fields.size()) {
2827
throw new IllegalArgumentException(); //TODO
2928
}
@@ -46,7 +45,11 @@ private void loadBuffers(FieldVector vector, Field field, Iterator<ArrowBuf> buf
4645
for (int j = 0; j < typeLayout.size(); j++) {
4746
ownBuffers.add(buffers.next());
4847
}
49-
vector.loadFieldBuffers(fieldNode, ownBuffers);
48+
try {
49+
vector.loadFieldBuffers(fieldNode, ownBuffers);
50+
} catch (RuntimeException e) {
51+
throw new IllegalArgumentException("Could not load buffers for field " + field);
52+
}
5053
List<Field> children = field.getChildren();
5154
if (children.size() > 0) {
5255
List<FieldVector> childrenFromFields = vector.getChildrenFromFields();

java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,35 +4,37 @@
44
import java.util.List;
55

66
import org.apache.arrow.vector.ValueVector.Accessor;
7-
import org.apache.arrow.vector.complex.MapVector;
87
import org.apache.arrow.vector.schema.ArrowFieldNode;
98
import org.apache.arrow.vector.schema.ArrowRecordBatch;
10-
import org.apache.arrow.vector.types.pojo.Field;
9+
import org.apache.arrow.vector.schema.ArrowVectorType;
1110
import org.apache.arrow.vector.types.pojo.Schema;
1211

1312
import io.netty.buffer.ArrowBuf;
1413

1514
public class VectorUnloader {
1615

17-
private final MapVector parent;
16+
private final Schema schema;
17+
private final int valueCount;
18+
private final List<FieldVector> vectors;
1819

19-
public VectorUnloader(MapVector parent) {
20+
public VectorUnloader(FieldVector parent) {
2021
super();
21-
this.parent = parent;
22+
this.schema = new Schema(parent.getField().getChildren());
23+
this.valueCount = parent.getAccessor().getValueCount();
24+
this.vectors = parent.getChildrenFromFields();
2225
}
2326

2427
public Schema getSchema() {
25-
Field rootField = parent.getField();
26-
return new Schema(rootField.getChildren());
28+
return schema;
2729
}
2830

2931
public ArrowRecordBatch getRecordBatch() {
3032
List<ArrowFieldNode> nodes = new ArrayList<>();
3133
List<ArrowBuf> buffers = new ArrayList<>();
32-
for (FieldVector vector : parent.getFieldVectors()) {
34+
for (FieldVector vector : vectors) {
3335
appendNodes(vector, nodes, buffers);
3436
}
35-
return new ArrowRecordBatch(parent.getAccessor().getValueCount(), nodes, buffers);
37+
return new ArrowRecordBatch(valueCount, nodes, buffers);
3638
}
3739

3840
private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers) {
@@ -46,8 +48,12 @@ private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes, List<Ar
4648
}
4749
}
4850
nodes.add(new ArrowFieldNode(accessor.getValueCount(), nullCount));
49-
// TODO: validate buffer count
50-
buffers.addAll(vector.getFieldBuffers());
51+
List<ArrowBuf> fieldBuffers = vector.getFieldBuffers();
52+
List<ArrowVectorType> expectedBuffers = vector.getField().getTypeLayout().getVectorTypes();
53+
if (fieldBuffers.size() != expectedBuffers.size()) {
54+
throw new IllegalArgumentException("wrong number of buffers for field " + vector.getField() + ". found: " + fieldBuffers);
55+
}
56+
buffers.addAll(fieldBuffers);
5157
for (FieldVector child : vector.getChildrenFromFields()) {
5258
appendNodes(child, nodes, buffers);
5359
}

java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,15 @@
1818
******************************************************************************/
1919
package org.apache.arrow.vector.complex;
2020

21+
import static java.util.Arrays.asList;
22+
23+
import java.util.Arrays;
2124
import java.util.List;
2225

2326
import org.apache.arrow.memory.BufferAllocator;
2427
import org.apache.arrow.memory.OutOfMemoryException;
2528
import org.apache.arrow.vector.AddOrGetResult;
29+
import org.apache.arrow.vector.FieldVector;
2630
import org.apache.arrow.vector.UInt1Vector;
2731
import org.apache.arrow.vector.UInt4Vector;
2832
import org.apache.arrow.vector.ValueVector;
@@ -32,6 +36,8 @@
3236
import org.apache.arrow.vector.complex.impl.UnionListWriter;
3337
import org.apache.arrow.vector.complex.reader.FieldReader;
3438
import org.apache.arrow.vector.complex.writer.FieldWriter;
39+
import org.apache.arrow.vector.schema.ArrowFieldNode;
40+
import org.apache.arrow.vector.types.Types;
3541
import org.apache.arrow.vector.types.Types.MinorType;
3642
import org.apache.arrow.vector.types.pojo.Field;
3743
import org.apache.arrow.vector.util.CallBack;
@@ -43,7 +49,7 @@
4349

4450
import io.netty.buffer.ArrowBuf;
4551

46-
public class ListVector extends BaseRepeatedValueVector {
52+
public class ListVector extends BaseRepeatedValueVector implements FieldVector {
4753

4854
UInt4Vector offsets;// TODO: THis masks the same vector in the parent
4955
final UInt1Vector bits;
@@ -62,6 +68,41 @@ public ListVector(String name, BufferAllocator allocator, CallBack callBack) {
6268
this.callBack = callBack;
6369
}
6470

71+
@Override
72+
public void initializeChildrenFromFields(List<Field> children) {
73+
if (children.size() != 1) {
74+
throw new IllegalArgumentException("Lists have only one child. Found: " + children);
75+
}
76+
Field field = children.get(0);
77+
MinorType minorType = Types.getMinorTypeForArrowType(field.getType());
78+
AddOrGetResult<FieldVector> addOrGetVector = addOrGetVector(minorType);
79+
if (!addOrGetVector.isCreated()) {
80+
throw new IllegalArgumentException("Child vector already existed: " + addOrGetVector.getVector());
81+
}
82+
}
83+
84+
@Override
85+
public List<FieldVector> getChildrenFromFields() {
86+
// TODO: data vector should be that type
87+
return Arrays.asList((FieldVector)getDataVector());
88+
}
89+
90+
@Override
91+
public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) {
92+
if (ownBuffers.size() != 2) {
93+
throw new IllegalArgumentException("Lists have a validity and offset vector. Found: " + ownBuffers);
94+
}
95+
this.bits.load(ownBuffers.get(0));
96+
this.offsets.load(ownBuffers.get(0));
97+
}
98+
99+
@Override
100+
public List<ArrowBuf> getFieldBuffers() {
101+
bits.getBuffer().readerIndex(0);
102+
offsets.getBuffer().readerIndex(0);
103+
return asList(bits.getBuffer(), offsets.getBuffer());
104+
}
105+
65106
public UnionListWriter getWriter() {
66107
return writer;
67108
}
@@ -297,4 +338,5 @@ public void setValueCount(int valueCount) {
297338
bits.getMutator().setValueCount(valueCount);
298339
}
299340
}
341+
300342
}

java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.arrow.vector.complex;
1919

20+
import static java.util.Arrays.asList;
21+
2022
import java.util.ArrayList;
2123
import java.util.Collection;
2224
import java.util.Iterator;
@@ -32,6 +34,7 @@
3234
import org.apache.arrow.vector.complex.impl.SingleMapReaderImpl;
3335
import org.apache.arrow.vector.complex.reader.FieldReader;
3436
import org.apache.arrow.vector.holders.ComplexHolder;
37+
import org.apache.arrow.vector.schema.ArrowFieldNode;
3538
import org.apache.arrow.vector.types.Types;
3639
import org.apache.arrow.vector.types.Types.MinorType;
3740
import org.apache.arrow.vector.types.pojo.ArrowType.Tuple;
@@ -46,7 +49,7 @@
4649

4750
import io.netty.buffer.ArrowBuf;
4851

49-
public class MapVector extends AbstractMapVector {
52+
public class MapVector extends AbstractMapVector implements FieldVector {
5053
//private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapVector.class);
5154

5255
private final SingleMapReaderImpl reader = new SingleMapReaderImpl(MapVector.this);
@@ -326,24 +329,34 @@ public void close() {
326329
super.close();
327330
}
328331

329-
private List<FieldVector> fieldChildren;
330-
331-
public void initializeChildren(List<Field> children) {
332-
if (fieldChildren != null) {
333-
throw new IllegalArgumentException(children.toString()); //TODO
334-
}
335-
fieldChildren = new ArrayList<>();
332+
@Override
333+
public void initializeChildrenFromFields(List<Field> children) {
336334
for (Field field : children) {
337335
MinorType minorType = Types.getMinorTypeForArrowType(field.getType());
338336
FieldVector vector = (FieldVector)this.add(field.getName(), minorType);
339-
fieldChildren.add(vector);
340337
vector.initializeChildrenFromFields(field.getChildren());
341338
}
342339
}
343340

344-
public List<FieldVector> getFieldVectors() {
345-
// TODO: clean this up
341+
@Override
342+
public List<FieldVector> getChildrenFromFields() {
343+
// TODO: children should be the right type
346344
return (List<FieldVector>)(List<?>)getChildren();
347345
}
348346

347+
@Override
348+
public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) {
349+
if (ownBuffers.size() != 1) {
350+
throw new IllegalArgumentException("Tuples have a validity. Found: " + ownBuffers);
351+
}
352+
// this.bits.load(ownBuffers.get(0));
353+
// TODO: add validity vector to make maps nullable
354+
}
355+
356+
@Override
357+
public List<ArrowBuf> getFieldBuffers() {
358+
// TODO: add validity vector to make maps nullable
359+
return asList(allocator.getEmpty());
360+
}
361+
349362
}

java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222
import org.apache.arrow.vector.complex.StateTool;
2323
import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter;
2424
import org.apache.arrow.vector.types.Types.MinorType;
25+
import org.apache.arrow.vector.types.pojo.Field;
2526

2627
import com.google.common.base.Preconditions;
27-
import org.apache.arrow.vector.types.pojo.Field;
2828

2929
public class ComplexWriterImpl extends AbstractFieldWriter implements ComplexWriter {
3030
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ComplexWriterImpl.class);

java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,15 +116,17 @@ public ArrowRecordBatch readRecordBatch(ArrowBlock recordBatchBlock) throws IOEx
116116
if (l < 0) {
117117
throw new InvalidArrowFileException("block invalid: " + recordBatchBlock);
118118
}
119-
ArrowBuf buffer = allocator.buffer(l);
119+
final ArrowBuf buffer = allocator.buffer(l);
120+
LOGGER.debug("allocated buffer " + buffer);
120121
in.position(recordBatchBlock.getOffset());
121122
int n = readFully(buffer, l);
122123
if (n != l) {
123124
throw new IllegalStateException(n + " != " + l);
124125
}
125126
RecordBatch recordBatchFB = RecordBatch.getRootAsRecordBatch(buffer.nioBuffer().asReadOnlyBuffer());
126127
int nodesLength = recordBatchFB.nodesLength();
127-
ArrowBuf body = buffer.slice(recordBatchBlock.getMetadataLength(), (int)recordBatchBlock.getBodyLength());
128+
final ArrowBuf body = buffer.slice(recordBatchBlock.getMetadataLength(), (int)recordBatchBlock.getBodyLength());
129+
LOGGER.debug("sliced body " + body);
128130
List<ArrowFieldNode> nodes = new ArrayList<>();
129131
for (int i = 0; i < nodesLength; ++i) {
130132
FieldNode node = recordBatchFB.nodes(i);
@@ -135,8 +137,11 @@ public ArrowRecordBatch readRecordBatch(ArrowBlock recordBatchBlock) throws IOEx
135137
Buffer bufferFB = recordBatchFB.buffers(i);
136138
LOGGER.debug(String.format("Buffer in RecordBatch at %d, length: %d", bufferFB.offset(), bufferFB.length()));
137139
ArrowBuf vectorBuffer = body.slice((int)bufferFB.offset(), (int)bufferFB.length());
140+
LOGGER.debug("sliced vectorBuffer " + vectorBuffer);
141+
vectorBuffer.retain();
138142
buffers.add(vectorBuffer);
139143
}
144+
buffer.release();
140145
return new ArrowRecordBatch(recordBatchFB.length(), nodes, buffers);
141146
}
142147

0 commit comments

Comments
 (0)