Skip to content

Commit 9b78adc

Browse files
julienledemkou
authored andcommitted
ARROW-274: Add NullableMapVector to support nullable maps
Author: Julien Le Dem <[email protected]> Closes apache#128 from julienledem/nullable_map and squashes the following commits: d98580a [Julien Le Dem] review feedback ee1dd45 [Julien Le Dem] Fix complex writers/readers 8780f48 [Julien Le Dem] ARROW-274: Add NullableMapVector to support nullable maps
1 parent 000209c commit 9b78adc

File tree

19 files changed

+408
-137
lines changed

19 files changed

+408
-137
lines changed

vector/src/main/codegen/templates/MapWriters.java

Lines changed: 16 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@
1717
*/
1818

1919
<@pp.dropOutputFile />
20-
<#list ["Single"] as mode>
20+
<#list ["Nullable", "Single"] as mode>
2121
<@pp.changeOutputFile name="/org/apache/arrow/vector/complex/impl/${mode}MapWriter.java" />
22+
<#assign index = "idx()">
2223
<#if mode == "Single">
2324
<#assign containerClass = "MapVector" />
24-
<#assign index = "idx()">
2525
<#else>
26-
<#assign containerClass = "RepeatedMapVector" />
27-
<#assign index = "currentChildIndex">
26+
<#assign containerClass = "NullableMapVector" />
2827
</#if>
2928
3029
<#include "/@includes/license.ftl" />
@@ -49,9 +48,13 @@ public class ${mode}MapWriter extends AbstractFieldWriter {
4948
5049
protected final ${containerClass} container;
5150
private final Map<String, FieldWriter> fields = Maps.newHashMap();
52-
<#if mode == "Repeated">private int currentChildIndex = 0;</#if>
5351
5452
public ${mode}MapWriter(${containerClass} container) {
53+
<#if mode == "Single">
54+
if (container instanceof NullableMapVector) {
55+
throw new IllegalArgumentException("Invalid container: " + container);
56+
}
57+
</#if>
5558
this.container = container;
5659
}
5760
@@ -75,12 +78,12 @@ public MapWriter map(String name) {
7578
FieldWriter writer = fields.get(name.toLowerCase());
7679
if(writer == null){
7780
int vectorCount=container.size();
78-
MapVector vector = container.addOrGet(name, MinorType.MAP, MapVector.class);
81+
NullableMapVector vector = container.addOrGet(name, MinorType.MAP, NullableMapVector.class);
7982
writer = new PromotableWriter(vector, container);
8083
if(vectorCount != container.size()) {
8184
writer.allocate();
8285
}
83-
writer.setPosition(${index});
86+
writer.setPosition(idx());
8487
fields.put(name.toLowerCase(), writer);
8588
}
8689
return writer;
@@ -117,40 +120,12 @@ public ListWriter list(String name) {
117120
if (container.size() > vectorCount) {
118121
writer.allocate();
119122
}
120-
writer.setPosition(${index});
123+
writer.setPosition(idx());
121124
fields.put(name.toLowerCase(), writer);
122125
}
123126
return writer;
124127
}
125128
126-
<#if mode == "Repeated">
127-
public void start() {
128-
// update the repeated vector to state that there is current+1 objects.
129-
final RepeatedMapHolder h = new RepeatedMapHolder();
130-
final RepeatedMapVector map = (RepeatedMapVector) container;
131-
final RepeatedMapVector.Mutator mutator = map.getMutator();
132-
133-
// Make sure that the current vector can support the end position of this list.
134-
if(container.getValueCapacity() <= idx()) {
135-
mutator.setValueCount(idx()+1);
136-
}
137-
138-
map.getAccessor().get(idx(), h);
139-
if (h.start >= h.end) {
140-
container.getMutator().startNewValue(idx());
141-
}
142-
currentChildIndex = container.getMutator().add(idx());
143-
for(final FieldWriter w : fields.values()) {
144-
w.setPosition(currentChildIndex);
145-
}
146-
}
147-
148-
149-
public void end() {
150-
// noop
151-
}
152-
<#else>
153-
154129
public void setValueCount(int count) {
155130
container.getMutator().setValueCount(count);
156131
}
@@ -165,14 +140,16 @@ public void setPosition(int index) {
165140
166141
@Override
167142
public void start() {
143+
<#if mode == "Single">
144+
<#else>
145+
container.getMutator().setIndexDefined(idx());
146+
</#if>
168147
}
169148
170149
@Override
171150
public void end() {
172151
}
173152
174-
</#if>
175-
176153
<#list vv.types as type><#list type.minor as minor>
177154
<#assign lowerName = minor.class?uncap_first />
178155
<#if lowerName == "int" ><#assign lowerName = "integer" /></#if>
@@ -204,7 +181,7 @@ public void end() {
204181
if (currentVector == null || currentVector != vector) {
205182
vector.allocateNewSafe();
206183
}
207-
writer.setPosition(${index});
184+
writer.setPosition(idx());
208185
fields.put(name.toLowerCase(), writer);
209186
}
210187
return writer;

vector/src/main/codegen/templates/UnionListWriter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,11 +160,13 @@ public void start() {
160160
vector.getMutator().setNotNull(idx());
161161
offsets.getMutator().setSafe(idx() + 1, nextOffset);
162162
writer.setPosition(nextOffset);
163+
writer.start();
163164
}
164165
165166
@Override
166167
public void end() {
167168
// if (inMap) {
169+
writer.end();
168170
inMap = false;
169171
final int nextOffset = offsets.getAccessor().get(idx() + 1);
170172
offsets.getMutator().setSafe(idx() + 1, nextOffset + 1);

vector/src/main/codegen/templates/UnionVector.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public class UnionVector implements FieldVector {
7272
MapVector internalMap;
7373
UInt1Vector typeVector;
7474

75-
private MapVector mapVector;
75+
private NullableMapVector mapVector;
7676
private ListVector listVector;
7777

7878
private FieldReader reader;
@@ -127,10 +127,10 @@ public List<BufferBacked> getFieldInnerVectors() {
127127
throw new UnsupportedOperationException();
128128
}
129129

130-
public MapVector getMap() {
130+
public NullableMapVector getMap() {
131131
if (mapVector == null) {
132132
int vectorCount = internalMap.size();
133-
mapVector = internalMap.addOrGet("map", MinorType.MAP, MapVector.class);
133+
mapVector = internalMap.addOrGet("map", MinorType.MAP, NullableMapVector.class);
134134
if (internalMap.size() > vectorCount) {
135135
mapVector.allocateNew();
136136
if (callBack != null) {

vector/src/main/codegen/templates/UnionWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public void endList() {
7474

7575
private MapWriter getMapWriter() {
7676
if (mapWriter == null) {
77-
mapWriter = new SingleMapWriter(data.getMap());
77+
mapWriter = new NullableMapWriter(data.getMap());
7878
mapWriter.setPosition(idx());
7979
writers.add(mapWriter);
8080
}

vector/src/main/java/org/apache/arrow/vector/NullableVector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818
package org.apache.arrow.vector;
1919

20-
public interface NullableVector extends ValueVector{
20+
public interface NullableVector extends ValueVector {
2121

2222
ValueVector getValuesVector();
2323
}

vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes, List<Ar
6868
List<ArrowBuf> fieldBuffers = vector.getFieldBuffers();
6969
List<ArrowVectorType> expectedBuffers = vector.getField().getTypeLayout().getVectorTypes();
7070
if (fieldBuffers.size() != expectedBuffers.size()) {
71-
throw new IllegalArgumentException("wrong number of buffers for field " + vector.getField() + ". found: " + fieldBuffers);
71+
throw new IllegalArgumentException(String.format(
72+
"wrong number of buffers for field %s in vector %s. found: %s",
73+
vector.getField(), vector.getClass().getSimpleName(), fieldBuffers));
7274
}
7375
buffers.addAll(fieldBuffers);
7476
for (FieldVector child : vector.getChildrenFromFields()) {

vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java

Lines changed: 6 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,20 @@
1818
package org.apache.arrow.vector.complex;
1919

2020
import java.util.ArrayList;
21-
import java.util.Arrays;
2221
import java.util.Collection;
23-
import java.util.Collections;
2422
import java.util.Iterator;
2523
import java.util.List;
2624
import java.util.Map;
2725

2826
import javax.annotation.Nullable;
2927

3028
import org.apache.arrow.memory.BufferAllocator;
31-
import org.apache.arrow.vector.BaseDataValueVector;
3229
import org.apache.arrow.vector.BaseValueVector;
33-
import org.apache.arrow.vector.BufferBacked;
3430
import org.apache.arrow.vector.FieldVector;
3531
import org.apache.arrow.vector.ValueVector;
3632
import org.apache.arrow.vector.complex.impl.SingleMapReaderImpl;
3733
import org.apache.arrow.vector.complex.reader.FieldReader;
3834
import org.apache.arrow.vector.holders.ComplexHolder;
39-
import org.apache.arrow.vector.schema.ArrowFieldNode;
4035
import org.apache.arrow.vector.types.Types;
4136
import org.apache.arrow.vector.types.Types.MinorType;
4237
import org.apache.arrow.vector.types.pojo.ArrowType.Tuple;
@@ -49,26 +44,20 @@
4944
import com.google.common.collect.Ordering;
5045
import com.google.common.primitives.Ints;
5146

52-
import io.netty.buffer.ArrowBuf;
53-
54-
public class MapVector extends AbstractMapVector implements FieldVector {
47+
public class MapVector extends AbstractMapVector {
5548
//private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapVector.class);
5649

57-
private final SingleMapReaderImpl reader = new SingleMapReaderImpl(MapVector.this);
50+
private final SingleMapReaderImpl reader = new SingleMapReaderImpl(this);
5851
private final Accessor accessor = new Accessor();
5952
private final Mutator mutator = new Mutator();
6053
int valueCount;
6154

62-
// TODO: validity vector
63-
private final List<BufferBacked> innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList());
64-
65-
public MapVector(String name, BufferAllocator allocator, CallBack callBack){
55+
public MapVector(String name, BufferAllocator allocator, CallBack callBack) {
6656
super(name, allocator, callBack);
6757
}
6858

6959
@Override
7060
public FieldReader getReader() {
71-
//return new SingleMapReaderImpl(MapVector.this);
7261
return reader;
7362
}
7463

@@ -124,18 +113,9 @@ public int getBufferSizeFor(final int valueCount) {
124113
return (int) bufferSize;
125114
}
126115

127-
@Override
128-
public ArrowBuf[] getBuffers(boolean clear) {
129-
int expectedSize = getBufferSize();
130-
int actualSize = super.getBufferSize();
131-
132-
Preconditions.checkArgument(expectedSize == actualSize, expectedSize + " != " + actualSize);
133-
return super.getBuffers(clear);
134-
}
135-
136116
@Override
137117
public TransferPair getTransferPair(BufferAllocator allocator) {
138-
return new MapTransferPair(this, name, allocator);
118+
return new MapTransferPair(this, new MapVector(name, allocator, callBack), false);
139119
}
140120

141121
@Override
@@ -145,18 +125,14 @@ public TransferPair makeTransferPair(ValueVector to) {
145125

146126
@Override
147127
public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
148-
return new MapTransferPair(this, ref, allocator);
128+
return new MapTransferPair(this, new MapVector(ref, allocator, callBack), false);
149129
}
150130

151131
protected static class MapTransferPair implements TransferPair{
152132
private final TransferPair[] pairs;
153133
private final MapVector from;
154134
private final MapVector to;
155135

156-
public MapTransferPair(MapVector from, String name, BufferAllocator allocator) {
157-
this(from, new MapVector(name, allocator, from.callBack), false);
158-
}
159-
160136
public MapTransferPair(MapVector from, MapVector to) {
161137
this(from, to, true);
162138
}
@@ -335,7 +311,6 @@ public void close() {
335311
super.close();
336312
}
337313

338-
@Override
339314
public void initializeChildrenFromFields(List<Field> children) {
340315
for (Field field : children) {
341316
MinorType minorType = Types.getMinorTypeForArrowType(field.getType());
@@ -344,25 +319,9 @@ public void initializeChildrenFromFields(List<Field> children) {
344319
}
345320
}
346321

347-
@Override
322+
348323
public List<FieldVector> getChildrenFromFields() {
349324
return getChildren();
350325
}
351326

352-
@Override
353-
public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) {
354-
BaseDataValueVector.load(getFieldInnerVectors(), ownBuffers);
355-
// TODO: something with fieldNode?
356-
}
357-
358-
@Override
359-
public List<ArrowBuf> getFieldBuffers() {
360-
return BaseDataValueVector.unload(getFieldInnerVectors());
361-
}
362-
363-
@Override
364-
public List<BufferBacked> getFieldInnerVectors() {
365-
return innerVectors;
366-
}
367-
368327
}

0 commit comments

Comments
 (0)