-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-28238][SQL] Implement DESCRIBE TABLE for Data Source V2 Tables. #25040
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #107145 has finished for PR 25040 at commit
|
|
Test build #107193 has finished for PR 25040 at commit
|
|
Test build #107204 has finished for PR 25040 at commit
|
|
|
||
| case DescribeColumnStatement( | ||
| CatalogObjectIdentifier(Some(catalog), ident), colName, isExtended) => | ||
| throw new AnalysisException("Describing columns is not supported for v2 tables.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be supported eventually, or is it redundant if DESCRIBE TABLE is available?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think we need to support it eventually if only to keep parity with V1 tables.
| } | ||
| DescribeTable(catalog.asTableCatalog, ident, isExtended) | ||
|
|
||
| case DropTableStatement(AsTableIdentifier(tableName), ifExists, purge) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was missing? Are there no tests for DROP TABLE?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh this might have been a bad copy-paste artifact. Even if this is missing, it doesn't belong in this PR.
| } | ||
|
|
||
| private def toCatalystRow(strs: String*): InternalRow = { | ||
| val encoder = RowEncoder(DescribeTableSchemas.DESCRIBE_TABLE_SCHEMA).resolveAndBind() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: I'd rather not create the encoder each time a row is created. Can you move this and the method to a companion object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sort of - couple of questions:
- Is
RowEncoderthread-safe? - I noticed if I create
RowEncoderbut immediatelyresolveAndBindit, and reuse the resolved encoder, the tests break as the describe returns incorrect rows. Presumably there's some kind of reused memory leak here. I didn't look into it that thoroughly - think we can just reuse the unresolved encoder andresolveAndBindbefore creating each row.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan, can you help answer these questions?
| throw new ParseException("DESC TABLE COLUMN for a specific partition is not supported", ctx) | ||
| } else { | ||
| DescribeColumnCommand( | ||
| visitTableIdentifier(ctx.tableIdentifier), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that these rules create DescribeColumnStatement and DescribeTableStatement, they should be moved into Catalyst. There isn't anything specific to the implementation any more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean that DescribeColumnCommand and DescribeTableCommand should be moved to Catalyst? The V1 commands depend on a bunch of stuff that's in core, such as SparkSession and DDLUtils.
|
Overall this looks good, but it doesn't move the parser rules to Catalyst. We've been trying to move as much as we can to Catalyst, to keep the parser and the SQL implementation separate instead of keeping them mixed together. That has also required moving the parser tests to Catalyst and moving the SparkSqlParser tests to a suite that tests parsing and conversion to v1 plans. |
|
I moved the parser rules and created a helper object for the encoder. |
|
|
||
| test("SPARK-17328 Fix NPE with EXPLAIN DESCRIBE TABLE") { | ||
| assertEqual("describe t", | ||
| DescribeTableCommand(TableIdentifier("t"), Map.empty, isExtended = false)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These test cases should be moved into catalyst as well.
|
+1 One minor comment, but otherwise this looks good to me. |
|
Test build #107484 has finished for PR 25040 at commit
|
|
Test build #107490 has finished for PR 25040 at commit
|
|
Test build #107546 has finished for PR 25040 at commit
|
| import org.apache.spark.sql.types.{MetadataBuilder, StringType, StructField, StructType} | ||
|
|
||
| private[sql] object DescribeTableSchemas { | ||
| val DESCRIBE_TABLE_ATTRIBUTES = Seq( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't define attributes in an object. AttributeReference will be assigned a unique ID when created, and in general we should create new attributes when creating a new logical plan.
For example, if you do df1 = sql("desc table t1"); df2 = sql("desc table ");, df1.join(df2) would fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you join the results of DESCRIBE?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scala> val df1 = sql("desc t1")
df1: org.apache.spark.sql.DataFrame = [col_name: string, data_type: string ... 1 more field]
scala> val df2 = sql("desc t2")
df2: org.apache.spark.sql.DataFrame = [col_name: string, data_type: string ... 1 more field]
scala> df1.crossJoin(df2).show
+--------+---------+-------+--------+---------+-------+
|col_name|data_type|comment|col_name|data_type|comment|
+--------+---------+-------+--------+---------+-------+
| i| int| null| j| int| null|
+--------+---------+-------+--------+---------+-------+
This is not a common use case but we don't have to break it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed this from a value to a method, so it will generate new identifiers every time while still being shared amongst multiple contexts.
| catalog: TableCatalog, | ||
| ident: Identifier, | ||
| isExtended: Boolean) extends Command { | ||
| override lazy val output = DescribeTableSchemas.DESCRIBE_TABLE_ATTRIBUTES |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need lazy val here, as it's not a heavy computing
| ident: Identifier, | ||
| isExtended: Boolean) extends Command { | ||
| override lazy val output = DescribeTableSchemas.DESCRIBE_TABLE_ATTRIBUTES | ||
| override lazy val schema = DescribeTableSchemas.DESCRIBE_TABLE_SCHEMA |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by default schema is StructType.fromAttributes(output), so we don't need to override it.
| ) | ||
|
|
||
| val DESCRIBE_TABLE_SCHEMA = StructType( | ||
| DESCRIBE_TABLE_ATTRIBUTES.map(attr => StructField(attr.name, attr.dataType, attr.nullable))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: StructType.fromAttributes(DESCRIBE_TABLE_ATTRIBUTES)
| } | ||
|
|
||
| } else { | ||
| rows += toCatalystRow(s"Table $ident does not exist.", "", "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we throw exception when table not found?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can follow the #24937: The DescribeTable should contain an UnresolvedRelation, so that analyzer can check table existence for us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Followed the AlterTable approach in the latest commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah but I didn't remove this - though I guess technically we should never hit this code path. We can throw an exception here instead.
| private val EMPTY_ROW = toCatalystRow("", "", "") | ||
|
|
||
| private def toCatalystRow(strs: String*): InternalRow = { | ||
| ENCODER.resolveAndBind().toRow( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the encoder only need to call resolveAndBind once
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't necessarily think so, but it could also be how this class is built. I think the encoder's state needs to be reset. When I don't resolveAndBind every time, the tests yield wrong results entirely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed it - we have to copy the rows generated by the encoder since the encoder re-uses the same memory space.
| } | ||
|
|
||
| case DescribeTable(catalog, ident, _, isExtended) => | ||
| DescribeTableExec(catalog, ident, isExtended) :: Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about
case DescribeTable(catalog, ident, r: DataSourceV2Relation, isExtended) =>
DescribeTableExec(r.table, ident, isExtended) :: Nil
Then we don't need to lookup the table again in DescribeTableExec
|
Test build #108063 has finished for PR 25040 at commit
|
|
Test build #108064 has finished for PR 25040 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Show resolved
Hide resolved
| import org.apache.spark.sql.catalyst.expressions.AttributeReference | ||
| import org.apache.spark.sql.types.{MetadataBuilder, StringType, StructField, StructType} | ||
|
|
||
| private[sql] object DescribeTableSchemas { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Singular, DescribeTableSchema?
| import org.apache.spark.sql.catalyst.catalog.BucketSpec | ||
| import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} | ||
| import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} | ||
| import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are getting really really long, and in particular the merge conflicts are a bit tedious to resolve. I'm normally very averse to wildcard imports, but there might come a point where we'll have to do that. Or I wonder if we could have a helper object that bundles all of these, or factory methods for these, or matchers... somehow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for wildcard here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This package may make sense for a wildcard import because it has no sub-packages and is unlikely to in the future. Still, because Scala will import sub-packages, I think it's probably best to keep avoiding wildcard imports, even here.
|
Everything that's been brought up has been taken care of. Let me know if there's anything else to address. |
| override val properties: util.Map[String, String]) | ||
| extends Table with SupportsRead with SupportsWrite { | ||
|
|
||
| def this( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where do we use this new constructor?
cloud-fan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except a few minor comments
|
Test build #108358 has finished for PR 25040 at commit
|
|
Test build #108362 has finished for PR 25040 at commit
|
|
@cloud-fan is this good to merge? |
|
Ah sorry I missed a few things, I'll address |
|
Test build #108729 has finished for PR 25040 at commit
|
|
thanks, merging to master! |
What changes were proposed in this pull request?
Implements the
DESCRIBE TABLElogical and physical plans for data source v2 tables.How was this patch tested?
Added unit tests to
DataSourceV2SQLSuite.