Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 25 additions & 10 deletions cpp/src/arrow/adapters/orc/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,16 @@ class OrcStripeReader : public RecordBatchReader {
int64_t batch_size_;
};

liborc::RowReaderOptions default_row_reader_options() {
liborc::RowReaderOptions options;
// Orc timestamp type is error-prone since it serializes values in the writer timezone
// and reads them back in the reader timezone. To avoid this, both the Apache Orc C++
// writer and reader set the timezone to GMT by default to avoid any conversion.
// We follow the same practice here explicitly to make sure readers are aware of this.
options.setTimezoneName("GMT");
return options;
}

} // namespace

class ORCFileReader::Impl {
Expand Down Expand Up @@ -320,47 +330,47 @@ class ORCFileReader::Impl {
}

Result<std::shared_ptr<Table>> Read() {
liborc::RowReaderOptions opts;
liborc::RowReaderOptions opts = default_row_reader_options();
ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema());
return ReadTable(opts, schema);
}

Result<std::shared_ptr<Table>> Read(const std::shared_ptr<Schema>& schema) {
liborc::RowReaderOptions opts;
liborc::RowReaderOptions opts = default_row_reader_options();
return ReadTable(opts, schema);
}

Result<std::shared_ptr<Table>> Read(const std::vector<int>& include_indices) {
liborc::RowReaderOptions opts;
liborc::RowReaderOptions opts = default_row_reader_options();
RETURN_NOT_OK(SelectIndices(&opts, include_indices));
ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema(opts));
return ReadTable(opts, schema);
}

Result<std::shared_ptr<Table>> Read(const std::vector<std::string>& include_names) {
liborc::RowReaderOptions opts;
liborc::RowReaderOptions opts = default_row_reader_options();
RETURN_NOT_OK(SelectNames(&opts, include_names));
ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema(opts));
return ReadTable(opts, schema);
}

Result<std::shared_ptr<Table>> Read(const std::shared_ptr<Schema>& schema,
const std::vector<int>& include_indices) {
liborc::RowReaderOptions opts;
liborc::RowReaderOptions opts = default_row_reader_options();
RETURN_NOT_OK(SelectIndices(&opts, include_indices));
return ReadTable(opts, schema);
}

Result<std::shared_ptr<RecordBatch>> ReadStripe(int64_t stripe) {
liborc::RowReaderOptions opts;
liborc::RowReaderOptions opts = default_row_reader_options();
RETURN_NOT_OK(SelectStripe(&opts, stripe));
ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema(opts));
return ReadBatch(opts, schema, stripes_[stripe].num_rows);
}

Result<std::shared_ptr<RecordBatch>> ReadStripe(
int64_t stripe, const std::vector<int>& include_indices) {
liborc::RowReaderOptions opts;
liborc::RowReaderOptions opts = default_row_reader_options();
RETURN_NOT_OK(SelectIndices(&opts, include_indices));
RETURN_NOT_OK(SelectStripe(&opts, stripe));
ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema(opts));
Expand All @@ -369,7 +379,7 @@ class ORCFileReader::Impl {

Result<std::shared_ptr<RecordBatch>> ReadStripe(
int64_t stripe, const std::vector<std::string>& include_names) {
liborc::RowReaderOptions opts;
liborc::RowReaderOptions opts = default_row_reader_options();
RETURN_NOT_OK(SelectNames(&opts, include_names));
RETURN_NOT_OK(SelectStripe(&opts, stripe));
ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema(opts));
Expand Down Expand Up @@ -475,7 +485,7 @@ class ORCFileReader::Impl {
return nullptr;
}

liborc::RowReaderOptions opts;
liborc::RowReaderOptions opts = default_row_reader_options();
if (!include_indices.empty()) {
RETURN_NOT_OK(SelectIndices(&opts, include_indices));
}
Expand All @@ -496,7 +506,7 @@ class ORCFileReader::Impl {

Result<std::shared_ptr<RecordBatchReader>> GetRecordBatchReader(
int64_t batch_size, const std::vector<std::string>& include_names) {
liborc::RowReaderOptions opts;
liborc::RowReaderOptions opts = default_row_reader_options();
if (!include_names.empty()) {
RETURN_NOT_OK(SelectNames(&opts, include_names));
}
Expand Down Expand Up @@ -696,6 +706,11 @@ Result<liborc::WriterOptions> MakeOrcWriterOptions(
});
orc_options.setColumnsUseBloomFilter(std::move(orc_bloom_filter_columns));
orc_options.setBloomFilterFPP(options.bloom_filter_fpp);
// Orc timestamp type is error-prone since it serializes values in the writer timezone
// and reads them back in the reader timezone. To avoid this, both the Apache Orc C++
// writer and reader set the timezone to GMT by default to avoid any conversion.
// We follow the same practice here explicitly to make sure readers are aware of this.
orc_options.setTimezoneName("GMT");
switch (options.compression) {
case Compression::UNCOMPRESSED:
orc_options.setCompression(liborc::CompressionKind::CompressionKind_NONE);
Expand Down
34 changes: 15 additions & 19 deletions cpp/src/arrow/adapters/orc/adapter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,30 +124,25 @@ Result<std::shared_ptr<Array>> GenerateRandomDate64Array(int64_t size,
date64(), rand.Int64(size, kMilliMin, kMilliMax, null_probability));
}

Result<std::shared_ptr<Array>> GenerateRandomTimestampArray(int64_t size,
TimeUnit::type type,
double null_probability) {
Result<std::shared_ptr<Array>> GenerateRandomTimestampArray(
int64_t size, std::shared_ptr<TimestampType> type, double null_probability) {
random::RandomArrayGenerator rand(kRandomSeed);
switch (type) {
switch (type->unit()) {
case TimeUnit::type::SECOND: {
return CastInt64ArrayToTemporalArray<TimestampArray>(
timestamp(TimeUnit::SECOND),
rand.Int64(size, kSecondMin, kSecondMax, null_probability));
type, rand.Int64(size, kSecondMin, kSecondMax, null_probability));
}
case TimeUnit::type::MILLI: {
return CastInt64ArrayToTemporalArray<TimestampArray>(
timestamp(TimeUnit::MILLI),
rand.Int64(size, kMilliMin, kMilliMax, null_probability));
type, rand.Int64(size, kMilliMin, kMilliMax, null_probability));
}
case TimeUnit::type::MICRO: {
return CastInt64ArrayToTemporalArray<TimestampArray>(
timestamp(TimeUnit::MICRO),
rand.Int64(size, kMicroMin, kMicroMax, null_probability));
type, rand.Int64(size, kMicroMin, kMicroMax, null_probability));
}
case TimeUnit::type::NANO: {
return CastInt64ArrayToTemporalArray<TimestampArray>(
timestamp(TimeUnit::NANO),
rand.Int64(size, kNanoMin, kNanoMax, null_probability));
type, rand.Int64(size, kNanoMin, kNanoMax, null_probability));
}
default: {
return Status::TypeError("Unknown or unsupported Arrow TimeUnit: ", type);
Expand Down Expand Up @@ -196,12 +191,11 @@ std::shared_ptr<ChunkedArray> GenerateRandomChunkedArray(
break;
}
case Type::TIMESTAMP: {
EXPECT_OK_AND_ASSIGN(
arrays[j],
GenerateRandomTimestampArray(
current_size_chunks[j],
internal::checked_pointer_cast<TimestampType>(data_type)->unit(),
null_probability));
EXPECT_OK_AND_ASSIGN(arrays[j],
GenerateRandomTimestampArray(
current_size_chunks[j],
internal::checked_pointer_cast<TimestampType>(data_type),
null_probability));
break;
}
default:
Expand Down Expand Up @@ -486,7 +480,8 @@ class TestORCWriterTrivialNoConversion : public ::testing::Test {
field("int32", int32()), field("int64", int64()), field("float", float32()),
field("double", float64()), field("decimal128nz", decimal128(25, 6)),
field("decimal128z", decimal128(32, 0)), field("date32", date32()),
field("ts3", timestamp(TimeUnit::NANO)), field("string", utf8()),
field("ts3", timestamp(TimeUnit::NANO)),
field("ts4", timestamp(TimeUnit::NANO, "UTC")), field("string", utf8()),
field("binary", binary()),
field("struct", struct_({field("a", utf8()), field("b", int64())})),
field("list", list(int32())),
Expand Down Expand Up @@ -526,6 +521,7 @@ TEST_F(TestORCWriterTrivialNoConversion, writeFilledChunkAndSelectField) {
field("double", float64()),
field("date32", date32()),
field("ts3", timestamp(TimeUnit::NANO)),
field("ts4", timestamp(TimeUnit::NANO), "UTC"),
field("string", utf8()),
field("binary", binary()),
});
Expand Down
25 changes: 24 additions & 1 deletion cpp/src/arrow/adapters/orc/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ Status AppendBatch(const liborc::Type* type, liborc::ColumnVectorBatch* batch,
return AppendNumericBatchCast<Date32Builder, int32_t, liborc::LongVectorBatch,
int64_t>(batch, offset, length, builder);
case liborc::TIMESTAMP:
case liborc::TIMESTAMP_INSTANT:
return AppendTimestampBatch(batch, offset, length, builder);
case liborc::DECIMAL:
return AppendDecimalBatch(type, batch, offset, length, builder);
Expand Down Expand Up @@ -995,8 +996,17 @@ Result<std::unique_ptr<liborc::Type>> GetOrcType(const DataType& type) {
case Type::type::DATE32:
return liborc::createPrimitiveType(liborc::TypeKind::DATE);
case Type::type::DATE64:
case Type::type::TIMESTAMP:
return liborc::createPrimitiveType(liborc::TypeKind::TIMESTAMP);
case Type::type::TIMESTAMP: {
const auto& timestamp_type = checked_cast<const TimestampType&>(type);
if (!timestamp_type.timezone().empty()) {
// The timestamp values stored in the arrow array are normalized to UTC.
// TIMESTAMP_INSTANT type is always preferred over TIMESTAMP type.
return liborc::createPrimitiveType(liborc::TypeKind::TIMESTAMP_INSTANT);
}
// The timestamp values stored in the arrow array can be in any timezone.
return liborc::createPrimitiveType(liborc::TypeKind::TIMESTAMP);
}
case Type::type::DECIMAL128: {
const uint64_t precision =
static_cast<uint64_t>(checked_cast<const Decimal128Type&>(type).precision());
Expand Down Expand Up @@ -1111,7 +1121,20 @@ Result<std::shared_ptr<DataType>> GetArrowType(const liborc::Type* type) {
case liborc::CHAR:
return fixed_size_binary(static_cast<int>(type->getMaximumLength()));
case liborc::TIMESTAMP:
// Values of TIMESTAMP type are stored in the writer timezone in the Orc file.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added extensive comment to explain the issue and suggested usage. Please check again. Thanks @wjones127

// Values are read back in the reader timezone. However, the writer timezone
// information in the Orc stripe footer is optional and may be missing. What is
// more, stripes in the same Orc file may have different writer timezones (though
// unlikely). So we cannot tell the exact timezone of values read back in the
// arrow::TimestampArray. In the adapter implementations, we set both writer and
// reader timezone to UTC to avoid any conversion so users can get the same values
// as written. To get rid of this burden, TIMESTAMP_INSTANT type is always preferred
// over TIMESTAMP type.
return timestamp(TimeUnit::NANO);
case liborc::TIMESTAMP_INSTANT:
// Values of TIMESTAMP_INSTANT type are stored in the UTC timezone in the ORC file.
// Both read and write use the UTC timezone without any conversion.
return timestamp(TimeUnit::NANO, "UTC");
case liborc::DATE:
return date32();
case liborc::DECIMAL: {
Expand Down