Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

/**
* The base interface for data source v2 implementations.
*
* Note that this is an empty interface, data source implementations should mix-in at least one of
* the plug-in interfaces like `ReadSupport`. Otherwise it's just a dummy data source which is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use an actual link ...

* un-readable/writable.
*/
public interface DataSourceV2 {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;

/**
* An immutable string-to-string map in which keys are case-insensitive. This is used to represent
* data source options.
*/
public class DataSourceV2Options {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a simple test suite for this

private final Map<String, String> keyLowerCasedMap;

private String toLowerCase(String key) {
return key.toLowerCase(Locale.ROOT);
}

public DataSourceV2Options(Map<String, String> originalMap) {
keyLowerCasedMap = new HashMap<>(originalMap.size());
for (Map.Entry<String, String> entry : originalMap.entrySet()) {
keyLowerCasedMap.put(toLowerCase(entry.getKey()), entry.getValue());
}
}

/**
* Returns the option value to which the specified key is mapped, case-insensitively.
*/
public Optional<String> get(String key) {
return Optional.ofNullable(keyLowerCasedMap.get(toLowerCase(key)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;

/**
* A mix-in interface for `DataSourceV2`. Users can implement this interface to provide data reading
Copy link
Contributor

@rxin rxin Sep 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users -> data source implementers

Actually a better one is

"Data sources can implement"

* ability and scan the data from the data source.
*/
public interface ReadSupport {

/**
* Creates a `DataSourceV2Reader` to scan the data for this data source.
*
* @param options the options for this data source reader, which is an immutable case-insensitive
* string-to-string map.
* @return a reader that implements the actual read logic.
*/
DataSourceV2Reader createReader(DataSourceV2Options options);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
import org.apache.spark.sql.types.StructType;

/**
* A mix-in interface for `DataSourceV2`. Users can implement this interface to provide data reading
* ability and scan the data from the data source.
*
* This is a variant of `ReadSupport` that accepts user-specified schema when reading data. A data
* source can implement both `ReadSupport` and `ReadSupportWithSchema` if it supports both schema
* inference and user-specified schema.
*/
public interface ReadSupportWithSchema {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still find ReadSupport vs ReadSupportWithSchema pretty confusing. But let's address that separately.


/**
* Create a `DataSourceV2Reader` to scan the data for this data source.
*
* @param schema the full schema of this data source reader. Full schema usually maps to the
* physical schema of the underlying storage of this data source reader, e.g.
* CSV files, JSON files, etc, while this reader may not read data with full
* schema, as column pruning or other optimizations may happen.
* @param options the options for this data source reader, which is an immutable case-insensitive
* string-to-string map.
* @return a reader that implements the actual read logic.
*/
DataSourceV2Reader createReader(StructType schema, DataSourceV2Options options);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.reader;

import java.io.Closeable;

/**
* A data reader returned by a read task and is responsible for outputting data for a RDD partition.
*/
public interface DataReader<T> extends Closeable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what can T be?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently it can be Row, UnsafeRow, ColumnarBatch.

Copy link
Contributor

@rxin rxin Sep 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document this and link it back to whatever method it is.

Also I'd still add an explicit init or open, and explicitly declare close in addition to closeable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initialization is done when creating this DataReader from a ReadTask. That ensures that the initialization happens (easy to forget open()) and simplifies the checks that need to be done because DataReader can't exist otherwise.


/**
* Proceed to next record, returns false if there is no more records.
*/
boolean next();

/**
* Return the current record. This method should return same value until `next` is called.
*/
T get();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.reader;

import java.util.List;

import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;

/**
* A data source reader that can mix in various query optimization interfaces and implement these
* optimizations. The actual scan logic should be delegated to `ReadTask`s that are returned by
* this data source reader.
*
* There are mainly 3 kinds of query optimizations:
* 1. Operators push-down. E.g., filter push-down, required columns push-down(aka column
* pruning), etc. These push-down interfaces are named like `SupportsPushDownXXX`.
* 2. Information Reporting. E.g., statistics reporting, ordering reporting, etc. These
* reporting interfaces are named like `SupportsReportingXXX`.
* 3. Special scan. E.g, columnar scan, unsafe row scan, etc. Note that a data source reader can
* implement at most one special scan. These scan interfaces are named like `SupportsScanXXX`.
*
* Spark first applies all operator push-down optimizations that this data source supports. Then
* Spark collects information this data source reported for further optimizations. Finally Spark
* issues the scan request and does the actual data reading.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: this is not true now, as we push down operators at the planning phase. We need to do some refactor and move it to the optimizing phase.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be really nice imho.

*/
public interface DataSourceV2Reader {

/**
* Returns the actual schema of this data source reader, which may be different from the physical
* schema of the underlying storage, as column pruning or other optimizations may happen.
*/
StructType readSchema();

/**
* Returns a list of read tasks. Each task is responsible for outputting data for one RDD
* partition. That means the number of tasks returned here is same as the number of RDD
* partitions this scan outputs.
*
* Note that, this may not be a full scan if the data source reader mixes in other optimization
* interfaces like column pruning, filter push-down, etc. These optimizations are applied before
* Spark issues the scan request.
*/
List<ReadTask<Row>> createReadTasks();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.reader;

import java.io.Serializable;

/**
* A read task returned by a data source reader and is responsible to create the data reader.
* The relationship between `ReadTask` and `DataReader` is similar to `Iterable` and `Iterator`.
*
* Note that, the read task will be serialized and sent to executors, then the data reader will be
* created on executors and do the actual reading.
*/
public interface ReadTask<T> extends Serializable {

/**
* The preferred locations where this read task can run faster, but Spark does not guarantee that
* this task will always run on these locations. The implementations should make sure that it can
* be run on any location. The location is a string representing the host name of an executor.
*/
default String[] preferredLocations() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what format are these strings expected to be in? If Spark will be placing this ReadTask onto an executor that is a preferred location, the format will need to be a documented part of the API

are there levels of preference, or only the binary? I'm thinking node vs rack vs datacenter for on-prem clusters, or instance vs AZ vs region for cloud clusers

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These have previously only been ip/hostnames. To match the RDD definition I think we would have to continue with that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API matches the RDD.preferredLocations directly, I'll add more documents here.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have a class Host which represents this? Just makes the API more clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, do you mean create a Host class which only has a string field?

return new String[0];
}

/**
* Returns a data reader to do the actual reading work for this read task.
*/
DataReader<T> createReader();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.reader;

import java.util.OptionalLong;

/**
* An interface to represent statistics for a data source.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

link back to SupportsReportStatistics

*/
public interface Statistics {
OptionalLong sizeInBytes();
OptionalLong numRows();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.reader;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.catalyst.expressions.Expression;

/**
* A mix-in interface for `DataSourceV2Reader`. Users can implement this interface to push down
* arbitrary expressions as predicates to the data source. This is an experimental and unstable
* interface as `Expression` is not public and may get changed in future Spark versions.
*
* Note that, if users implement both this interface and `SupportsPushDownFilters`, Spark will
* ignore `SupportsPushDownFilters` and only process this interface.
*/
@Experimental
@InterfaceStability.Unstable
public interface SupportsPushDownCatalystFilters {

/**
* Pushes down filters, and returns unsupported filters.
*/
Expression[] pushCatalystFilters(Expression[] filters);
}
Loading