Skip to content

Commit ee1dd45

Browse files
committed
Fix complex writers/readers
1 parent 8780f48 commit ee1dd45

File tree

10 files changed

+135
-73
lines changed

10 files changed

+135
-73
lines changed

java/vector/src/main/codegen/templates/MapWriters.java

Lines changed: 16 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@
1717
*/
1818

1919
<@pp.dropOutputFile />
20-
<#list ["Single"] as mode>
20+
<#list ["Nullable", "Single"] as mode>
2121
<@pp.changeOutputFile name="/org/apache/arrow/vector/complex/impl/${mode}MapWriter.java" />
22-
<#if mode == "Single">
23-
<#assign containerClass = "NullableMapVector" />
2422
<#assign index = "idx()">
23+
<#if mode == "Single">
24+
<#assign containerClass = "MapVector" />
2525
<#else>
26-
<#assign containerClass = "RepeatedMapVector" />
27-
<#assign index = "currentChildIndex">
26+
<#assign containerClass = "NullableMapVector" />
2827
</#if>
2928
3029
<#include "/@includes/license.ftl" />
@@ -49,9 +48,13 @@ public class ${mode}MapWriter extends AbstractFieldWriter {
4948
5049
protected final ${containerClass} container;
5150
private final Map<String, FieldWriter> fields = Maps.newHashMap();
52-
<#if mode == "Repeated">private int currentChildIndex = 0;</#if>
5351
5452
public ${mode}MapWriter(${containerClass} container) {
53+
<#if mode == "Single">
54+
if (container instanceof NullableMapVector) {
55+
throw new IllegalArgumentException("Invalid container: " + container);
56+
}
57+
</#if>
5558
this.container = container;
5659
}
5760
@@ -75,12 +78,12 @@ public MapWriter map(String name) {
7578
FieldWriter writer = fields.get(name.toLowerCase());
7679
if(writer == null){
7780
int vectorCount=container.size();
78-
MapVector vector = container.addOrGet(name, MinorType.MAP, NullableMapVector.class);
81+
NullableMapVector vector = container.addOrGet(name, MinorType.MAP, NullableMapVector.class);
7982
writer = new PromotableWriter(vector, container);
8083
if(vectorCount != container.size()) {
8184
writer.allocate();
8285
}
83-
writer.setPosition(${index});
86+
writer.setPosition(idx());
8487
fields.put(name.toLowerCase(), writer);
8588
}
8689
return writer;
@@ -117,40 +120,12 @@ public ListWriter list(String name) {
117120
if (container.size() > vectorCount) {
118121
writer.allocate();
119122
}
120-
writer.setPosition(${index});
123+
writer.setPosition(idx());
121124
fields.put(name.toLowerCase(), writer);
122125
}
123126
return writer;
124127
}
125128
126-
<#if mode == "Repeated">
127-
public void start() {
128-
// update the repeated vector to state that there is current+1 objects.
129-
final RepeatedMapHolder h = new RepeatedMapHolder();
130-
final RepeatedMapVector map = (RepeatedMapVector) container;
131-
final RepeatedMapVector.Mutator mutator = map.getMutator();
132-
133-
// Make sure that the current vector can support the end position of this list.
134-
if(container.getValueCapacity() <= idx()) {
135-
mutator.setValueCount(idx()+1);
136-
}
137-
138-
map.getAccessor().get(idx(), h);
139-
if (h.start >= h.end) {
140-
container.getMutator().startNewValue(idx());
141-
}
142-
currentChildIndex = container.getMutator().add(idx());
143-
for(final FieldWriter w : fields.values()) {
144-
w.setPosition(currentChildIndex);
145-
}
146-
}
147-
148-
149-
public void end() {
150-
// noop
151-
}
152-
<#else>
153-
154129
public void setValueCount(int count) {
155130
container.getMutator().setValueCount(count);
156131
}
@@ -165,15 +140,16 @@ public void setPosition(int index) {
165140
166141
@Override
167142
public void start() {
143+
<#if mode == "Single">
144+
<#else>
168145
container.getMutator().setIndexDefined(idx());
146+
</#if>
169147
}
170148
171149
@Override
172150
public void end() {
173151
}
174152
175-
</#if>
176-
177153
<#list vv.types as type><#list type.minor as minor>
178154
<#assign lowerName = minor.class?uncap_first />
179155
<#if lowerName == "int" ><#assign lowerName = "integer" /></#if>
@@ -205,7 +181,7 @@ public void end() {
205181
if (currentVector == null || currentVector != vector) {
206182
vector.allocateNewSafe();
207183
}
208-
writer.setPosition(${index});
184+
writer.setPosition(idx());
209185
fields.put(name.toLowerCase(), writer);
210186
}
211187
return writer;

java/vector/src/main/codegen/templates/UnionListWriter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,11 +160,13 @@ public void start() {
160160
vector.getMutator().setNotNull(idx());
161161
offsets.getMutator().setSafe(idx() + 1, nextOffset);
162162
writer.setPosition(nextOffset);
163+
writer.start();
163164
}
164165
165166
@Override
166167
public void end() {
167168
// if (inMap) {
169+
writer.end();
168170
inMap = false;
169171
final int nextOffset = offsets.getAccessor().get(idx() + 1);
170172
offsets.getMutator().setSafe(idx() + 1, nextOffset + 1);

java/vector/src/main/codegen/templates/UnionWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public void endList() {
7474

7575
private MapWriter getMapWriter() {
7676
if (mapWriter == null) {
77-
mapWriter = new SingleMapWriter(data.getMap());
77+
mapWriter = new NullableMapWriter(data.getMap());
7878
mapWriter.setPosition(idx());
7979
writers.add(mapWriter);
8080
}

java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
public class MapVector extends AbstractMapVector {
5050
//private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapVector.class);
5151

52-
private final SingleMapReaderImpl reader = new SingleMapReaderImpl(MapVector.this);
52+
private final SingleMapReaderImpl reader = new SingleMapReaderImpl(this);
5353
private final Accessor accessor = new Accessor();
5454
private final Mutator mutator = new Mutator();
5555
int valueCount;
@@ -60,7 +60,6 @@ public MapVector(String name, BufferAllocator allocator, CallBack callBack) {
6060

6161
@Override
6262
public FieldReader getReader() {
63-
//return new SingleMapReaderImpl(MapVector.this);
6463
return reader;
6564
}
6665

@@ -118,16 +117,16 @@ public int getBufferSizeFor(final int valueCount) {
118117

119118
@Override
120119
public ArrowBuf[] getBuffers(boolean clear) {
121-
int expectedSize = getBufferSize();
122-
int actualSize = super.getBufferSize();
123-
124-
Preconditions.checkArgument(expectedSize == actualSize, expectedSize + " != " + actualSize);
120+
// int expectedSize = getBufferSize();
121+
// int actualSize = super.getBufferSize();
122+
//
123+
// Preconditions.checkArgument(expectedSize == actualSize, expectedSize + " != " + actualSize);
125124
return super.getBuffers(clear);
126125
}
127126

128127
@Override
129128
public TransferPair getTransferPair(BufferAllocator allocator) {
130-
return new MapTransferPair(this, name, allocator);
129+
return new MapTransferPair(this, new MapVector(name, allocator, callBack), false);
131130
}
132131

133132
@Override
@@ -137,18 +136,14 @@ public TransferPair makeTransferPair(ValueVector to) {
137136

138137
@Override
139138
public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
140-
return new MapTransferPair(this, ref, allocator);
139+
return new MapTransferPair(this, new MapVector(ref, allocator, callBack), false);
141140
}
142141

143142
protected static class MapTransferPair implements TransferPair{
144143
private final TransferPair[] pairs;
145144
private final MapVector from;
146145
private final MapVector to;
147146

148-
public MapTransferPair(MapVector from, String name, BufferAllocator allocator) {
149-
this(from, new MapVector(name, allocator, from.callBack), false);
150-
}
151-
152147
public MapTransferPair(MapVector from, MapVector to) {
153148
this(from, to, true);
154149
}

java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,23 @@
2929
import org.apache.arrow.vector.FieldVector;
3030
import org.apache.arrow.vector.NullableVectorDefinitionSetter;
3131
import org.apache.arrow.vector.UInt1Vector;
32+
import org.apache.arrow.vector.ValueVector;
33+
import org.apache.arrow.vector.complex.impl.NullableMapReaderImpl;
34+
import org.apache.arrow.vector.complex.reader.FieldReader;
3235
import org.apache.arrow.vector.holders.ComplexHolder;
3336
import org.apache.arrow.vector.schema.ArrowFieldNode;
3437
import org.apache.arrow.vector.util.CallBack;
38+
import org.apache.arrow.vector.util.TransferPair;
3539

3640
import com.google.common.collect.ObjectArrays;
3741

3842
import io.netty.buffer.ArrowBuf;
3943

4044
public class NullableMapVector extends MapVector implements FieldVector {
4145

42-
private final UInt1Vector bits;
46+
private final NullableMapReaderImpl reader = new NullableMapReaderImpl(this);
47+
48+
protected final UInt1Vector bits;
4349

4450
private final List<BufferBacked> innerVectors;
4551

@@ -70,6 +76,54 @@ public List<BufferBacked> getFieldInnerVectors() {
7076
return innerVectors;
7177
}
7278

79+
@Override
80+
public FieldReader getReader() {
81+
return reader;
82+
}
83+
84+
@Override
85+
public TransferPair getTransferPair(BufferAllocator allocator) {
86+
return new NullableMapTransferPair(this, new NullableMapVector(name, allocator, callBack), false);
87+
}
88+
89+
@Override
90+
public TransferPair makeTransferPair(ValueVector to) {
91+
return new NullableMapTransferPair(this, (NullableMapVector) to, true);
92+
}
93+
94+
@Override
95+
public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
96+
return new NullableMapTransferPair(this, new NullableMapVector(ref, allocator, callBack), false);
97+
}
98+
99+
protected class NullableMapTransferPair extends MapTransferPair {
100+
101+
private NullableMapVector target;
102+
103+
protected NullableMapTransferPair(NullableMapVector from, NullableMapVector to, boolean allocate) {
104+
super(from, to, allocate);
105+
this.target = to;
106+
}
107+
108+
@Override
109+
public void transfer() {
110+
bits.transferTo(target.bits);
111+
super.transfer();
112+
}
113+
114+
@Override
115+
public void copyValueSafe(int fromIndex, int toIndex) {
116+
target.bits.copyFromSafe(fromIndex, toIndex, bits);
117+
super.copyValueSafe(fromIndex, toIndex);
118+
}
119+
120+
@Override
121+
public void splitAndTransfer(int startIndex, int length) {
122+
bits.splitAndTransferTo(startIndex, length, target.bits);
123+
super.splitAndTransfer(startIndex, length);
124+
}
125+
}
126+
73127
@Override
74128
public int getValueCapacity() {
75129
return Math.min(bits.getValueCapacity(), super.getValueCapacity());
@@ -92,6 +146,7 @@ public void clear() {
92146
super.clear();
93147
}
94148

149+
95150
@Override
96151
public int getBufferSize(){
97152
return super.getBufferSize() + bits.getBufferSize();
@@ -166,17 +221,13 @@ private Mutator(){
166221

167222
@Override
168223
public void setIndexDefined(int index){
169-
bits.getMutator().set(index, 1);
224+
bits.getMutator().setSafe(index, 1);
170225
}
171226

172227
public void setNull(int index){
173228
bits.getMutator().setSafe(index, 0);
174229
}
175230

176-
public boolean isSafe(int outIndex) {
177-
return outIndex < NullableMapVector.this.getValueCapacity();
178-
}
179-
180231
@Override
181232
public void setValueCount(int valueCount) {
182233
assert valueCount >= 0;

java/vector/src/main/java/org/apache/arrow/vector/complex/impl/AbstractBaseReader.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,10 @@
1919

2020
import java.util.Iterator;
2121

22-
import com.google.flatbuffers.FlatBufferBuilder;
23-
import org.apache.arrow.flatbuf.Type;
24-
import org.apache.arrow.flatbuf.Union;
25-
import org.apache.arrow.flatbuf.UnionMode;
2622
import org.apache.arrow.vector.complex.reader.FieldReader;
2723
import org.apache.arrow.vector.complex.writer.BaseWriter.ListWriter;
2824
import org.apache.arrow.vector.complex.writer.FieldWriter;
2925
import org.apache.arrow.vector.holders.UnionHolder;
30-
import org.apache.arrow.vector.types.pojo.Field;
3126

3227

3328
abstract class AbstractBaseReader implements FieldReader{
@@ -44,7 +39,7 @@ public void setPosition(int index){
4439
this.index = index;
4540
}
4641

47-
int idx(){
42+
protected int idx(){
4843
return index;
4944
}
5045

java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
public class ComplexWriterImpl extends AbstractFieldWriter implements ComplexWriter {
3131
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ComplexWriterImpl.class);
3232

33-
private SingleMapWriter mapRoot;
33+
private NullableMapWriter mapRoot;
3434
private UnionListWriter listRoot;
3535
private final MapVector container;
3636

@@ -123,7 +123,7 @@ public MapWriter directMap(){
123123

124124
case INIT:
125125
NullableMapVector map = (NullableMapVector) container;
126-
mapRoot = new SingleMapWriter(map);
126+
mapRoot = new NullableMapWriter(map);
127127
mapRoot.setPosition(idx());
128128
mode = Mode.MAP;
129129
break;
@@ -144,7 +144,7 @@ public MapWriter rootAsMap() {
144144

145145
case INIT:
146146
NullableMapVector map = container.addOrGet(name, MinorType.MAP, NullableMapVector.class);
147-
mapRoot = new SingleMapWriter(map);
147+
mapRoot = new NullableMapWriter(map);
148148
mapRoot.setPosition(idx());
149149
mode = Mode.MAP;
150150
break;

0 commit comments

Comments
 (0)