Skip to content

Commit 803afeb

Browse files
committed
ARROW-264: File format
This is work in progress Author: Julien Le Dem <[email protected]> Closes #123 from julienledem/arrow_264_file_format and squashes the following commits: 252de6d [Julien Le Dem] remove outdated comment 04d797f [Julien Le Dem] maps are not nullable yet e8359b3 [Julien Le Dem] align on 8 byte boundaries; more tests 8b8b823 [Julien Le Dem] refactoring 31e95e6 [Julien Le Dem] fix list vector b824938 [Julien Le Dem] fix types; add licenses; more tests; more complex 2fd3bc1 [Julien Le Dem] cleanup 50fe680 [Julien Le Dem] nested support b0bf6bc [Julien Le Dem] cleanup 4247b1a [Julien Le Dem] fix whitespace d6a1788 [Julien Le Dem] refactoring 81863c5 [Julien Le Dem] fixed loader aa1b766 [Julien Le Dem] better test 2067e01 [Julien Le Dem] update format aacf61e [Julien Le Dem] fix pom b907aa5 [Julien Le Dem] simplify e43f26b [Julien Le Dem] add layout spec 0cc9718 [Julien Le Dem] add vector type ac6902a [Julien Le Dem] ARROW-264: File format 807db51 [Julien Le Dem] move information to schema f2f0596 [Julien Le Dem] Update FieldNode structure to be more explicit and reflect schema
1 parent ec51d56 commit 803afeb

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+2722
-162
lines changed

cpp/src/arrow/ipc/metadata-internal.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ static Status FieldToFlatbuffer(
219219
RETURN_NOT_OK(TypeToFlatbuffer(fbb, field->type, &children, &type_enum, &type_data));
220220
auto fb_children = fbb.CreateVector(children);
221221

222+
// TODO: produce the list of VectorTypes
222223
*offset = flatbuf::CreateField(
223224
fbb, fb_name, field->nullable, type_enum, type_data, field->dictionary,
224225
fb_children);

format/File.fbs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
include "Message.fbs";
2+
3+
namespace org.apache.arrow.flatbuf;
4+
5+
/// ----------------------------------------------------------------------
6+
/// Arrow File metadata
7+
///
8+
9+
table Footer {
10+
11+
schema: org.apache.arrow.flatbuf.Schema;
12+
13+
dictionaries: [ Block ];
14+
15+
recordBatches: [ Block ];
16+
}
17+
18+
struct Block {
19+
20+
offset: long;
21+
22+
metaDataLength: int;
23+
24+
bodyLength: long;
25+
26+
}
27+
28+
root_type Footer;

format/Message.fbs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ table Tuple {
1717
table List {
1818
}
1919

20-
enum UnionMode:int { Sparse, Dense }
20+
enum UnionMode:short { Sparse, Dense }
2121

2222
table Union {
2323
mode: UnionMode;
@@ -28,7 +28,7 @@ table Int {
2828
is_signed: bool;
2929
}
3030

31-
enum Precision:int {SINGLE, DOUBLE}
31+
enum Precision:short {SINGLE, DOUBLE}
3232

3333
table FloatingPoint {
3434
precision: Precision;
@@ -91,6 +91,17 @@ union Type {
9191
JSONScalar
9292
}
9393

94+
enum VectorType: short {
95+
/// used in List type Dense Union and variable length primitive types (String, Binary)
96+
OFFSET,
97+
/// fixed length primitive values
98+
VALUES,
99+
/// Bit vector indicated if each value is null
100+
VALIDITY,
101+
/// Type vector used in Union type
102+
TYPE
103+
}
104+
94105
/// ----------------------------------------------------------------------
95106
/// A field represents a named column in a record / row batch or child of a
96107
/// nested type.
@@ -109,12 +120,16 @@ table Field {
109120
dictionary: long;
110121
// children apply only to Nested data types like Struct, List and Union
111122
children: [Field];
123+
/// the buffers produced for this type (as derived from the Type)
124+
/// does not include children
125+
/// each recordbatch will return instances of those Buffers.
126+
buffers: [ VectorType ];
112127
}
113128

114129
/// ----------------------------------------------------------------------
115130
/// Endianness of the platform that produces the RecordBatch
116131

117-
enum Endianness:int { Little, Big }
132+
enum Endianness:short { Little, Big }
118133

119134
/// ----------------------------------------------------------------------
120135
/// A Schema describes the columns in a row batch

java/format/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@
106106
<argument>-o</argument>
107107
<argument>target/generated-sources/</argument>
108108
<argument>../../format/Message.fbs</argument>
109+
<argument>../../format/File.fbs</argument>
109110
</arguments>
110111
</configuration>
111112
</execution>

java/memory/src/main/java/io/netty/buffer/ArrowBuf.java

Lines changed: 42 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
*/
1818
package io.netty.buffer;
1919

20-
import io.netty.util.internal.PlatformDependent;
21-
2220
import java.io.IOException;
2321
import java.io.InputStream;
2422
import java.io.OutputStream;
@@ -30,16 +28,18 @@
3028
import java.util.concurrent.atomic.AtomicInteger;
3129
import java.util.concurrent.atomic.AtomicLong;
3230

31+
import org.apache.arrow.memory.AllocationManager.BufferLedger;
3332
import org.apache.arrow.memory.BaseAllocator;
33+
import org.apache.arrow.memory.BaseAllocator.Verbosity;
3434
import org.apache.arrow.memory.BoundsChecking;
3535
import org.apache.arrow.memory.BufferAllocator;
3636
import org.apache.arrow.memory.BufferManager;
37-
import org.apache.arrow.memory.AllocationManager.BufferLedger;
38-
import org.apache.arrow.memory.BaseAllocator.Verbosity;
3937
import org.apache.arrow.memory.util.HistoricalLog;
4038

4139
import com.google.common.base.Preconditions;
4240

41+
import io.netty.util.internal.PlatformDependent;
42+
4343
public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
4444
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ArrowBuf.class);
4545

@@ -307,7 +307,7 @@ public ByteOrder order() {
307307
}
308308

309309
@Override
310-
public ByteBuf order(ByteOrder endianness) {
310+
public ArrowBuf order(ByteOrder endianness) {
311311
return this;
312312
}
313313

@@ -344,7 +344,7 @@ public ByteBuf copy(int index, int length) {
344344
}
345345

346346
@Override
347-
public ByteBuf slice() {
347+
public ArrowBuf slice() {
348348
return slice(readerIndex(), readableBytes());
349349
}
350350

@@ -467,7 +467,7 @@ public boolean equals(Object obj) {
467467
}
468468

469469
@Override
470-
public ByteBuf retain(int increment) {
470+
public ArrowBuf retain(int increment) {
471471
Preconditions.checkArgument(increment > 0, "retain(%d) argument is not positive", increment);
472472

473473
if (isEmpty) {
@@ -484,7 +484,7 @@ public ByteBuf retain(int increment) {
484484
}
485485

486486
@Override
487-
public ByteBuf retain() {
487+
public ArrowBuf retain() {
488488
return retain(1);
489489
}
490490

@@ -535,109 +535,109 @@ public short getShort(int index) {
535535
}
536536

537537
@Override
538-
public ByteBuf setShort(int index, int value) {
538+
public ArrowBuf setShort(int index, int value) {
539539
chk(index, 2);
540540
PlatformDependent.putShort(addr(index), (short) value);
541541
return this;
542542
}
543543

544544
@Override
545-
public ByteBuf setInt(int index, int value) {
545+
public ArrowBuf setInt(int index, int value) {
546546
chk(index, 4);
547547
PlatformDependent.putInt(addr(index), value);
548548
return this;
549549
}
550550

551551
@Override
552-
public ByteBuf setLong(int index, long value) {
552+
public ArrowBuf setLong(int index, long value) {
553553
chk(index, 8);
554554
PlatformDependent.putLong(addr(index), value);
555555
return this;
556556
}
557557

558558
@Override
559-
public ByteBuf setChar(int index, int value) {
559+
public ArrowBuf setChar(int index, int value) {
560560
chk(index, 2);
561561
PlatformDependent.putShort(addr(index), (short) value);
562562
return this;
563563
}
564564

565565
@Override
566-
public ByteBuf setFloat(int index, float value) {
566+
public ArrowBuf setFloat(int index, float value) {
567567
chk(index, 4);
568568
PlatformDependent.putInt(addr(index), Float.floatToRawIntBits(value));
569569
return this;
570570
}
571571

572572
@Override
573-
public ByteBuf setDouble(int index, double value) {
573+
public ArrowBuf setDouble(int index, double value) {
574574
chk(index, 8);
575575
PlatformDependent.putLong(addr(index), Double.doubleToRawLongBits(value));
576576
return this;
577577
}
578578

579579
@Override
580-
public ByteBuf writeShort(int value) {
580+
public ArrowBuf writeShort(int value) {
581581
ensure(2);
582582
PlatformDependent.putShort(addr(writerIndex), (short) value);
583583
writerIndex += 2;
584584
return this;
585585
}
586586

587587
@Override
588-
public ByteBuf writeInt(int value) {
588+
public ArrowBuf writeInt(int value) {
589589
ensure(4);
590590
PlatformDependent.putInt(addr(writerIndex), value);
591591
writerIndex += 4;
592592
return this;
593593
}
594594

595595
@Override
596-
public ByteBuf writeLong(long value) {
596+
public ArrowBuf writeLong(long value) {
597597
ensure(8);
598598
PlatformDependent.putLong(addr(writerIndex), value);
599599
writerIndex += 8;
600600
return this;
601601
}
602602

603603
@Override
604-
public ByteBuf writeChar(int value) {
604+
public ArrowBuf writeChar(int value) {
605605
ensure(2);
606606
PlatformDependent.putShort(addr(writerIndex), (short) value);
607607
writerIndex += 2;
608608
return this;
609609
}
610610

611611
@Override
612-
public ByteBuf writeFloat(float value) {
612+
public ArrowBuf writeFloat(float value) {
613613
ensure(4);
614614
PlatformDependent.putInt(addr(writerIndex), Float.floatToRawIntBits(value));
615615
writerIndex += 4;
616616
return this;
617617
}
618618

619619
@Override
620-
public ByteBuf writeDouble(double value) {
620+
public ArrowBuf writeDouble(double value) {
621621
ensure(8);
622622
PlatformDependent.putLong(addr(writerIndex), Double.doubleToRawLongBits(value));
623623
writerIndex += 8;
624624
return this;
625625
}
626626

627627
@Override
628-
public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
628+
public ArrowBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
629629
udle.getBytes(index + offset, dst, dstIndex, length);
630630
return this;
631631
}
632632

633633
@Override
634-
public ByteBuf getBytes(int index, ByteBuffer dst) {
634+
public ArrowBuf getBytes(int index, ByteBuffer dst) {
635635
udle.getBytes(index + offset, dst);
636636
return this;
637637
}
638638

639639
@Override
640-
public ByteBuf setByte(int index, int value) {
640+
public ArrowBuf setByte(int index, int value) {
641641
chk(index, 1);
642642
PlatformDependent.putByte(addr(index), (byte) value);
643643
return this;
@@ -699,13 +699,13 @@ protected void _setLong(int index, long value) {
699699
}
700700

701701
@Override
702-
public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
702+
public ArrowBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
703703
udle.getBytes(index + offset, dst, dstIndex, length);
704704
return this;
705705
}
706706

707707
@Override
708-
public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
708+
public ArrowBuf getBytes(int index, OutputStream out, int length) throws IOException {
709709
udle.getBytes(index + offset, out, length);
710710
return this;
711711
}
@@ -724,12 +724,12 @@ public int getBytes(int index, GatheringByteChannel out, int length) throws IOEx
724724
}
725725

726726
@Override
727-
public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
727+
public ArrowBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
728728
udle.setBytes(index + offset, src, srcIndex, length);
729729
return this;
730730
}
731731

732-
public ByteBuf setBytes(int index, ByteBuffer src, int srcIndex, int length) {
732+
public ArrowBuf setBytes(int index, ByteBuffer src, int srcIndex, int length) {
733733
if (src.isDirect()) {
734734
checkIndex(index, length);
735735
PlatformDependent.copyMemory(PlatformDependent.directBufferAddress(src) + srcIndex, this.memoryAddress() + index,
@@ -749,13 +749,13 @@ public ByteBuf setBytes(int index, ByteBuffer src, int srcIndex, int length) {
749749
}
750750

751751
@Override
752-
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
752+
public ArrowBuf setBytes(int index, byte[] src, int srcIndex, int length) {
753753
udle.setBytes(index + offset, src, srcIndex, length);
754754
return this;
755755
}
756756

757757
@Override
758-
public ByteBuf setBytes(int index, ByteBuffer src) {
758+
public ArrowBuf setBytes(int index, ByteBuffer src) {
759759
udle.setBytes(index + offset, src);
760760
return this;
761761
}
@@ -860,4 +860,17 @@ public void print(StringBuilder sb, int indent, Verbosity verbosity) {
860860
}
861861
}
862862

863+
@Override
864+
public ArrowBuf readerIndex(int readerIndex) {
865+
super.readerIndex(readerIndex);
866+
return this;
867+
}
868+
869+
@Override
870+
public ArrowBuf writerIndex(int writerIndex) {
871+
super.writerIndex(writerIndex);
872+
return this;
873+
}
874+
875+
863876
}

java/vector/src/main/codegen/data/ArrowTypes.tdd

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,15 @@
3030
},
3131
{
3232
name: "Union",
33-
fields: []
33+
fields: [{name: "mode", type: short}]
3434
},
3535
{
3636
name: "Int",
3737
fields: [{name: "bitWidth", type: int}, {name: "isSigned", type: boolean}]
3838
},
3939
{
4040
name: "FloatingPoint",
41-
fields: [{name: precision, type: int}]
41+
fields: [{name: precision, type: short}]
4242
},
4343
{
4444
name: "Utf8",

0 commit comments

Comments
 (0)