Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

import org.apache.spark.annotation.InterfaceStability;

/**
* The base interface for data source v2. Implementations must have a public, no arguments
* constructor.
*
* Note that this is an empty interface, data source implementations should mix-in at least one of
* the plug-in interfaces like {@link ReadSupport}. Otherwise it's just a dummy data source which is
* un-readable/writable.
*/
@InterfaceStability.Evolving
public interface DataSourceV2 {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;

import org.apache.spark.annotation.InterfaceStability;

/**
* An immutable string-to-string map in which keys are case-insensitive. This is used to represent
* data source options.
*/
@InterfaceStability.Evolving
public class DataSourceV2Options {
private final Map<String, String> keyLowerCasedMap;

private String toLowerCase(String key) {
return key.toLowerCase(Locale.ROOT);
}

public DataSourceV2Options(Map<String, String> originalMap) {
keyLowerCasedMap = new HashMap<>(originalMap.size());
for (Map.Entry<String, String> entry : originalMap.entrySet()) {
keyLowerCasedMap.put(toLowerCase(entry.getKey()), entry.getValue());
}
}

/**
* Returns the option value to which the specified key is mapped, case-insensitively.
*/
public Optional<String> get(String key) {
return Optional.ofNullable(keyLowerCasedMap.get(toLowerCase(key)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;

/**
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
* provide data reading ability and scan the data from the data source.
*/
@InterfaceStability.Evolving
public interface ReadSupport {

/**
* Creates a {@link DataSourceV2Reader} to scan the data from this data source.
*
* @param options the options for this data source reader, which is an immutable case-insensitive
* string-to-string map.
* @return a reader that implements the actual read logic.
*/
DataSourceV2Reader createReader(DataSourceV2Options options);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
import org.apache.spark.sql.types.StructType;

/**
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
* provide data reading ability and scan the data from the data source.
*
* This is a variant of {@link ReadSupport} that accepts user-specified schema when reading data.
* A data source can implement both {@link ReadSupport} and {@link ReadSupportWithSchema} if it
* supports both schema inference and user-specified schema.
*/
@InterfaceStability.Evolving
public interface ReadSupportWithSchema {

/**
* Create a {@link DataSourceV2Reader} to scan the data from this data source.
*
* @param schema the full schema of this data source reader. Full schema usually maps to the
* physical schema of the underlying storage of this data source reader, e.g.
* CSV files, JSON files, etc, while this reader may not read data with full
* schema, as column pruning or other optimizations may happen.
* @param options the options for this data source reader, which is an immutable case-insensitive
* string-to-string map.
* @return a reader that implements the actual read logic.
*/
DataSourceV2Reader createReader(StructType schema, DataSourceV2Options options);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.reader;

import java.io.Closeable;

import org.apache.spark.annotation.InterfaceStability;

/**
* A data reader returned by {@link ReadTask#createReader()} and is responsible for outputting data
* for a RDD partition.
*/
@InterfaceStability.Evolving
public interface DataReader<T> extends Closeable {

/**
* Proceed to next record, returns false if there is no more records.
*/
boolean next();

/**
* Return the current record. This method should return same value until `next` is called.
*/
T get();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.reader;

import java.util.List;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.sources.v2.DataSourceV2Options;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
import org.apache.spark.sql.types.StructType;

/**
* A data source reader that is returned by
* {@link ReadSupport#createReader(DataSourceV2Options)} or
* {@link ReadSupportWithSchema#createReader(StructType, DataSourceV2Options)}.
* It can mix in various query optimization interfaces to speed up the data scan. The actual scan
* logic should be delegated to {@link ReadTask}s that are returned by {@link #createReadTasks()}.
*
* There are mainly 3 kinds of query optimizations:
* 1. Operators push-down. E.g., filter push-down, required columns push-down(aka column
* pruning), etc. These push-down interfaces are named like `SupportsPushDownXXX`.
* 2. Information Reporting. E.g., statistics reporting, ordering reporting, etc. These
* reporting interfaces are named like `SupportsReportingXXX`.
* 3. Special scans. E.g, columnar scan, unsafe row scan, etc. These scan interfaces are named
* like `SupportsScanXXX`.
*
* Spark first applies all operator push-down optimizations that this data source supports. Then
* Spark collects information this data source reported for further optimizations. Finally Spark
* issues the scan request and does the actual data reading.
*/
@InterfaceStability.Evolving
public interface DataSourceV2Reader {

/**
* Returns the actual schema of this data source reader, which may be different from the physical
* schema of the underlying storage, as column pruning or other optimizations may happen.
*/
StructType readSchema();

/**
* Returns a list of read tasks. Each task is responsible for outputting data for one RDD
* partition. That means the number of tasks returned here is same as the number of RDD
* partitions this scan outputs.
*
* Note that, this may not be a full scan if the data source reader mixes in other optimization
* interfaces like column pruning, filter push-down, etc. These optimizations are applied before
* Spark issues the scan request.
*/
List<ReadTask<Row>> createReadTasks();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.reader;

import java.io.Serializable;

import org.apache.spark.annotation.InterfaceStability;

/**
* A read task returned by {@link DataSourceV2Reader#createReadTasks()} and is responsible for
* creating the actual data reader. The relationship between {@link ReadTask} and {@link DataReader}
* is similar to the relationship between {@link Iterable} and {@link java.util.Iterator}.
*
* Note that, the read task will be serialized and sent to executors, then the data reader will be
* created on executors and do the actual reading.
*/
@InterfaceStability.Evolving
public interface ReadTask<T> extends Serializable {

/**
* The preferred locations where this read task can run faster, but Spark does not guarantee that
* this task will always run on these locations. The implementations should make sure that it can
* be run on any location. The location is a string representing the host name of an executor.
*/
default String[] preferredLocations() {
return new String[0];
}

/**
* Returns a data reader to do the actual reading work for this read task.
*/
DataReader<T> createReader();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.reader;

import java.util.OptionalLong;

import org.apache.spark.annotation.InterfaceStability;

/**
* An interface to represent statistics for a data source, which is returned by
* {@link SupportsReportStatistics#getStatistics()}.
*/
@InterfaceStability.Evolving
public interface Statistics {
OptionalLong sizeInBytes();
OptionalLong numRows();
}
Loading