diff --git a/src/duckdb/src/common/adbc/adbc.cpp b/src/duckdb/src/common/adbc/adbc.cpp index a86f32562..8cf16141d 100644 --- a/src/duckdb/src/common/adbc/adbc.cpp +++ b/src/duckdb/src/common/adbc/adbc.cpp @@ -113,6 +113,7 @@ struct DuckDBAdbcStatementWrapper { duckdb_connection connection; duckdb_prepared_statement statement; char *ingestion_table_name; + char *target_catalog; char *db_schema; ArrowArrayStream ingestion_stream; IngestionMode ingestion_mode = IngestionMode::CREATE; @@ -725,16 +726,36 @@ void stream_schema(ArrowArrayStream *stream, ArrowSchema &schema) { } // Helper function to build CREATE TABLE SQL statement -static std::string BuildCreateTableSQL(const char *schema, const char *table_name, +static std::string BuildCreateTableSQL(const char *catalog, const char *schema, const char *table_name, const duckdb::vector &types, - const duckdb::vector &names, bool if_not_exists = false) { + const duckdb::vector &names, bool if_not_exists = false, + bool temporary = false, bool replace = false) { std::ostringstream create_table; - create_table << "CREATE TABLE "; + if (replace) { + create_table << "CREATE OR REPLACE "; + } else { + create_table << "CREATE "; + } + if (temporary) { + create_table << "TEMP "; + } + create_table << "TABLE "; if (if_not_exists) { create_table << "IF NOT EXISTS "; } - if (schema) { - create_table << duckdb::KeywordHelper::WriteOptionallyQuoted(schema) << "."; + // Note: DuckDB resolves two-part names as either catalog.table (default schema) + // or schema.table depending on context. This can become ambiguous if a schema and + // an attached catalog share a name. Callers should prefer passing an explicit + // schema (or defaulting to "main") to produce an unambiguous three-part name. + // For TEMP tables, specifying catalog/schema in the CREATE statement is not allowed; + // the table is automatically placed in the temp catalog. + if (!temporary) { + if (catalog) { + create_table << duckdb::KeywordHelper::WriteOptionallyQuoted(catalog) << "."; + } + if (schema) { + create_table << duckdb::KeywordHelper::WriteOptionallyQuoted(schema) << "."; + } } create_table << duckdb::KeywordHelper::WriteOptionallyQuoted(table_name) << " ("; for (idx_t i = 0; i < types.size(); i++) { @@ -748,18 +769,7 @@ static std::string BuildCreateTableSQL(const char *schema, const char *table_nam return create_table.str(); } -// Helper function to build DROP TABLE IF EXISTS SQL statement -static std::string BuildDropTableSQL(const char *schema, const char *table_name) { - std::ostringstream drop_table; - drop_table << "DROP TABLE IF EXISTS "; - if (schema) { - drop_table << duckdb::KeywordHelper::WriteOptionallyQuoted(schema) << "."; - } - drop_table << duckdb::KeywordHelper::WriteOptionallyQuoted(table_name); - return drop_table.str(); -} - -AdbcStatusCode Ingest(duckdb_connection connection, const char *table_name, const char *schema, +AdbcStatusCode Ingest(duckdb_connection connection, const char *catalog, const char *table_name, const char *schema, struct ArrowArrayStream *input, struct AdbcError *error, IngestionMode ingestion_mode, bool temporary, int64_t *rows_affected) { if (!connection) { @@ -775,10 +785,31 @@ AdbcStatusCode Ingest(duckdb_connection connection, const char *table_name, cons return ADBC_STATUS_INVALID_ARGUMENT; } if (schema && temporary) { - // Temporary option is not supported with ADBC_INGEST_OPTION_TARGET_DB_SCHEMA or - // ADBC_INGEST_OPTION_TARGET_CATALOG + // Temporary option is not supported with ADBC_INGEST_OPTION_TARGET_DB_SCHEMA SetError(error, "Temporary option is not supported with schema"); - return ADBC_STATUS_INVALID_ARGUMENT; + return ADBC_STATUS_INVALID_STATE; + } + if (catalog && temporary) { + // Temporary option is not supported with ADBC_INGEST_OPTION_TARGET_CATALOG + SetError(error, "Temporary option is not supported with catalog"); + return ADBC_STATUS_INVALID_STATE; + } + + // Resolve target name parts. + // Used for both SQL generation (CREATE/DROP) and appender lookup. + // Prefer explicit three-part names; two-part names can be ambiguous. + const char *effective_catalog = catalog; + const char *effective_schema = schema; + if (temporary) { + // Temporary tables live in the special "temp" catalog. + // "CREATE TEMP TABLE" automatically places tables in temp.main. + // For the appender, we need to explicitly target the temp catalog. + effective_catalog = "temp"; + effective_schema = nullptr; + } else if (catalog && !schema) { + // Default schema for attached catalogs (DEFAULT_SCHEMA). + // Use catalog.main.table to avoid catalog/schema name ambiguity. + effective_schema = "main"; } duckdb::ArrowSchemaWrapper arrow_schema_wrapper; @@ -800,7 +831,7 @@ AdbcStatusCode Ingest(duckdb_connection connection, const char *table_name, cons switch (ingestion_mode) { case IngestionMode::CREATE: { // CREATE mode: Create table, error if already exists - auto sql = BuildCreateTableSQL(schema, table_name, types, names); + auto sql = BuildCreateTableSQL(effective_catalog, effective_schema, table_name, types, names, false, temporary); duckdb_result result; if (duckdb_query(connection, sql.c_str(), &result) == DuckDBError) { const char *error_msg = duckdb_result_error(&result); @@ -820,16 +851,10 @@ AdbcStatusCode Ingest(duckdb_connection connection, const char *table_name, cons // The appender will naturally fail if the table doesn't exist break; case IngestionMode::REPLACE: { - // REPLACE mode: Drop table if exists, then create - auto drop_sql = BuildDropTableSQL(schema, table_name); - auto create_sql = BuildCreateTableSQL(schema, table_name, types, names); + // REPLACE mode: CREATE OR REPLACE TABLE + auto create_sql = + BuildCreateTableSQL(effective_catalog, effective_schema, table_name, types, names, false, temporary, true); duckdb_result result; - if (duckdb_query(connection, drop_sql.c_str(), &result) == DuckDBError) { - SetError(error, duckdb_result_error(&result)); - duckdb_destroy_result(&result); - return ADBC_STATUS_INTERNAL; - } - duckdb_destroy_result(&result); if (duckdb_query(connection, create_sql.c_str(), &result) == DuckDBError) { SetError(error, duckdb_result_error(&result)); duckdb_destroy_result(&result); @@ -840,7 +865,7 @@ AdbcStatusCode Ingest(duckdb_connection connection, const char *table_name, cons } case IngestionMode::CREATE_APPEND: { // CREATE_APPEND mode: Create if not exists, append if exists - auto sql = BuildCreateTableSQL(schema, table_name, types, names, true); + auto sql = BuildCreateTableSQL(effective_catalog, effective_schema, table_name, types, names, true, temporary); duckdb_result result; if (duckdb_query(connection, sql.c_str(), &result) == DuckDBError) { SetError(error, duckdb_result_error(&result)); @@ -851,7 +876,7 @@ AdbcStatusCode Ingest(duckdb_connection connection, const char *table_name, cons break; } } - AppenderWrapper appender(connection, schema, table_name); + AppenderWrapper appender(connection, effective_catalog, effective_schema, table_name); if (!appender.Valid()) { return ADBC_STATUS_INTERNAL; } @@ -919,6 +944,7 @@ AdbcStatusCode StatementNew(struct AdbcConnection *connection, struct AdbcStatem statement_wrapper->statement = nullptr; statement_wrapper->ingestion_stream.release = nullptr; statement_wrapper->ingestion_table_name = nullptr; + statement_wrapper->target_catalog = nullptr; statement_wrapper->db_schema = nullptr; statement_wrapper->temporary_table = false; @@ -943,6 +969,10 @@ AdbcStatusCode StatementRelease(struct AdbcStatement *statement, struct AdbcErro free(wrapper->ingestion_table_name); wrapper->ingestion_table_name = nullptr; } + if (wrapper->target_catalog) { + free(wrapper->target_catalog); + wrapper->target_catalog = nullptr; + } if (wrapper->db_schema) { free(wrapper->db_schema); wrapper->db_schema = nullptr; @@ -1019,8 +1049,9 @@ static AdbcStatusCode IngestToTableFromBoundStream(DuckDBAdbcStatementWrapper *s auto stream = statement->ingestion_stream; // Ingest into a table from the bound stream - return Ingest(statement->connection, statement->ingestion_table_name, statement->db_schema, &stream, error, - statement->ingestion_mode, statement->temporary_table, rows_affected); + return Ingest(statement->connection, statement->target_catalog, statement->ingestion_table_name, + statement->db_schema, &stream, error, statement->ingestion_mode, statement->temporary_table, + rows_affected); } AdbcStatusCode StatementExecuteQuery(struct AdbcStatement *statement, struct ArrowArrayStream *out, @@ -1354,15 +1385,25 @@ AdbcStatusCode StatementSetOption(struct AdbcStatement *statement, const char *k auto wrapper = static_cast(statement->private_data); if (strcmp(key, ADBC_INGEST_OPTION_TARGET_TABLE) == 0) { + if (wrapper->ingestion_table_name) { + free(wrapper->ingestion_table_name); + } wrapper->ingestion_table_name = strdup(value); - wrapper->temporary_table = false; return ADBC_STATUS_OK; } if (strcmp(key, ADBC_INGEST_OPTION_TEMPORARY) == 0) { if (strcmp(value, ADBC_OPTION_VALUE_ENABLED) == 0) { + // Align with arrow-adbc PostgreSQL driver behavior: if a schema was set + // before enabling temporary ingestion, clear it so temporary can proceed. + // (Some clients set schema by default.) if (wrapper->db_schema) { - SetError(error, "Temporary option is not supported with schema"); - return ADBC_STATUS_INVALID_ARGUMENT; + free(wrapper->db_schema); + wrapper->db_schema = nullptr; + } + // Some clients may also set a catalog by default; clear it so temporary can proceed. + if (wrapper->target_catalog) { + free(wrapper->target_catalog); + wrapper->target_catalog = nullptr; } wrapper->temporary_table = true; return ADBC_STATUS_OK; @@ -1378,14 +1419,21 @@ AdbcStatusCode StatementSetOption(struct AdbcStatement *statement, const char *k } if (strcmp(key, ADBC_INGEST_OPTION_TARGET_DB_SCHEMA) == 0) { - if (wrapper->temporary_table) { - SetError(error, "Temporary option is not supported with schema"); - return ADBC_STATUS_INVALID_ARGUMENT; + if (wrapper->db_schema) { + free(wrapper->db_schema); } wrapper->db_schema = strdup(value); return ADBC_STATUS_OK; } + if (strcmp(key, ADBC_INGEST_OPTION_TARGET_CATALOG) == 0) { + if (wrapper->target_catalog) { + free(wrapper->target_catalog); + } + wrapper->target_catalog = strdup(value); + return ADBC_STATUS_OK; + } + if (strcmp(key, ADBC_INGEST_OPTION_MODE) == 0) { if (strcmp(value, ADBC_INGEST_OPTION_MODE_CREATE) == 0) { wrapper->ingestion_mode = IngestionMode::CREATE; diff --git a/src/duckdb/src/execution/physical_plan_generator.cpp b/src/duckdb/src/execution/physical_plan_generator.cpp index 74b420a34..30a28ea70 100644 --- a/src/duckdb/src/execution/physical_plan_generator.cpp +++ b/src/duckdb/src/execution/physical_plan_generator.cpp @@ -185,4 +185,11 @@ PhysicalOperator &PhysicalPlanGenerator::CreatePlan(LogicalOperator &op) { throw InternalException("Physical plan generator - no plan generated"); } +ArenaAllocator &PhysicalPlanGenerator::ArenaRef() { + if (!physical_plan) { + physical_plan = make_uniq(Allocator::Get(context)); + } + return physical_plan->ArenaRef(); +} + } // namespace duckdb diff --git a/src/duckdb/src/function/table/version/pragma_version.cpp b/src/duckdb/src/function/table/version/pragma_version.cpp index 43972a5ea..494b967cb 100644 --- a/src/duckdb/src/function/table/version/pragma_version.cpp +++ b/src/duckdb/src/function/table/version/pragma_version.cpp @@ -1,5 +1,5 @@ #ifndef DUCKDB_PATCH_VERSION -#define DUCKDB_PATCH_VERSION "0-dev5446" +#define DUCKDB_PATCH_VERSION "0-dev5469" #endif #ifndef DUCKDB_MINOR_VERSION #define DUCKDB_MINOR_VERSION 5 @@ -8,10 +8,10 @@ #define DUCKDB_MAJOR_VERSION 1 #endif #ifndef DUCKDB_VERSION -#define DUCKDB_VERSION "v1.5.0-dev5446" +#define DUCKDB_VERSION "v1.5.0-dev5469" #endif #ifndef DUCKDB_SOURCE_ID -#define DUCKDB_SOURCE_ID "13e1f2229c" +#define DUCKDB_SOURCE_ID "f5174f0d6d" #endif #include "duckdb/function/table/system_functions.hpp" #include "duckdb/main/database.hpp" diff --git a/src/duckdb/src/include/duckdb.h b/src/duckdb/src/include/duckdb.h index 05c7094e6..30e240159 100644 --- a/src/duckdb/src/include/duckdb.h +++ b/src/duckdb/src/include/duckdb.h @@ -3456,6 +3456,20 @@ This allows NULL values to be written to the vector, regardless of whether a val */ DUCKDB_C_API void duckdb_vector_ensure_validity_writable(duckdb_vector vector); +/*! +Safely assigns a string element in the vector at the specified location. Supersedes +`duckdb_vector_assign_string_element`. The vector type must be VARCHAR and the input must be valid UTF-8. Otherwise, it +returns an invalid Unicode error. + +* @param vector The vector to alter +* @param index The row position in the vector to assign the string to +* @param str The null-terminated string +* @return If valid UTF-8, then `nullptr`, else error information. If not `nullptr`, then the return value must be +destroyed with `duckdb_destroy_error_data`. +*/ +DUCKDB_C_API duckdb_error_data duckdb_vector_safe_assign_string_element(duckdb_vector vector, idx_t index, + const char *str); + /*! Assigns a string element in the vector at the specified location. diff --git a/src/duckdb/src/include/duckdb/common/adbc/adbc.hpp b/src/duckdb/src/include/duckdb/common/adbc/adbc.hpp index 391744f48..75532e7bf 100644 --- a/src/duckdb/src/include/duckdb/common/adbc/adbc.hpp +++ b/src/duckdb/src/include/duckdb/common/adbc/adbc.hpp @@ -18,10 +18,18 @@ namespace duckdb_adbc { class AppenderWrapper { public: - AppenderWrapper(duckdb_connection conn, const char *schema, const char *table) : appender(nullptr) { - if (duckdb_appender_create(conn, schema, table, &appender) != DuckDBSuccess) { - appender = nullptr; + AppenderWrapper(duckdb_connection conn, const char *catalog, const char *schema, const char *table) + : appender(nullptr) { + // Note: duckdb_appender_create_ext allocates an internal wrapper even on failure. + // If creation fails, make sure to destroy it to avoid leaking. + auto created = duckdb_appender(nullptr); + if (duckdb_appender_create_ext(conn, catalog, schema, table, &created) != DuckDBSuccess) { + if (created) { + duckdb_appender_destroy(&created); + } + return; } + appender = created; } ~AppenderWrapper() { if (appender) { diff --git a/src/duckdb/src/include/duckdb/execution/physical_plan_generator.hpp b/src/duckdb/src/include/duckdb/execution/physical_plan_generator.hpp index 1a1080e7b..7de6cbb0f 100644 --- a/src/duckdb/src/include/duckdb/execution/physical_plan_generator.hpp +++ b/src/duckdb/src/include/duckdb/execution/physical_plan_generator.hpp @@ -100,6 +100,10 @@ class PhysicalPlanGenerator { return physical_plan->Make(std::forward(args)...); } + //! Get a reference to the ArenaAllocator of the underlying physical plan. + //! Creates a new (empty) physical plan if none exists yet. + ArenaAllocator &ArenaRef(); + public: PhysicalOperator &ResolveDefaultsProjection(LogicalInsert &op, PhysicalOperator &child); diff --git a/src/duckdb/src/include/duckdb/main/capi/extension_api.hpp b/src/duckdb/src/include/duckdb/main/capi/extension_api.hpp index c90e14c76..b32dd04df 100644 --- a/src/duckdb/src/include/duckdb/main/capi/extension_api.hpp +++ b/src/duckdb/src/include/duckdb/main/capi/extension_api.hpp @@ -671,6 +671,7 @@ typedef struct { sel_t *(*duckdb_selection_vector_get_data_ptr)(duckdb_selection_vector sel); void (*duckdb_vector_copy_sel)(duckdb_vector src, duckdb_vector dst, duckdb_selection_vector sel, idx_t src_count, idx_t src_offset, idx_t dst_offset); + duckdb_error_data (*duckdb_vector_safe_assign_string_element)(duckdb_vector vector, idx_t index, const char *str); } duckdb_ext_api_v1; //===--------------------------------------------------------------------===// @@ -1221,6 +1222,7 @@ inline duckdb_ext_api_v1 CreateAPIv1() { result.duckdb_destroy_selection_vector = duckdb_destroy_selection_vector; result.duckdb_selection_vector_get_data_ptr = duckdb_selection_vector_get_data_ptr; result.duckdb_vector_copy_sel = duckdb_vector_copy_sel; + result.duckdb_vector_safe_assign_string_element = duckdb_vector_safe_assign_string_element; return result; } diff --git a/src/duckdb/src/include/duckdb/storage/caching_file_system.hpp b/src/duckdb/src/include/duckdb/storage/caching_file_system.hpp index 53e7083f2..87f5db5ec 100644 --- a/src/duckdb/src/include/duckdb/storage/caching_file_system.hpp +++ b/src/duckdb/src/include/duckdb/storage/caching_file_system.hpp @@ -9,6 +9,7 @@ #pragma once #include "duckdb/common/enums/cache_validation_mode.hpp" +#include "duckdb/common/file_opener.hpp" #include "duckdb/common/file_open_flags.hpp" #include "duckdb/common/open_file_info.hpp" #include "duckdb/common/winapi.hpp" @@ -35,7 +36,7 @@ struct CachingFileHandle { public: DUCKDB_API CachingFileHandle(QueryContext context, CachingFileSystem &caching_file_system, const OpenFileInfo &path, - FileOpenFlags flags, CachedFile &cached_file); + FileOpenFlags flags, optional_ptr opener, CachedFile &cached_file); DUCKDB_API ~CachingFileHandle(); public: @@ -89,6 +90,8 @@ struct CachingFileHandle { OpenFileInfo path; //! Flags used to open the file FileOpenFlags flags; + //! File opener, which contains file open context. + optional_ptr opener; //! Cache validation mode for this file CacheValidationMode validate; //! The associated CachedFile with cached ranges @@ -119,9 +122,10 @@ class CachingFileSystem { public: DUCKDB_API static CachingFileSystem Get(ClientContext &context); - DUCKDB_API unique_ptr OpenFile(const OpenFileInfo &path, FileOpenFlags flags); + DUCKDB_API unique_ptr OpenFile(const OpenFileInfo &path, FileOpenFlags flags, + optional_ptr opener = nullptr); DUCKDB_API unique_ptr OpenFile(QueryContext context, const OpenFileInfo &path, - FileOpenFlags flags); + FileOpenFlags flags, optional_ptr opener = nullptr); private: //! The Client FileSystem (needs to be client-specific so we can do, e.g., HTTPFS profiling) diff --git a/src/duckdb/src/include/duckdb_extension.h b/src/duckdb/src/include/duckdb_extension.h index c77522c97..c023bd8a1 100644 --- a/src/duckdb/src/include/duckdb_extension.h +++ b/src/duckdb/src/include/duckdb_extension.h @@ -776,6 +776,7 @@ typedef struct { sel_t *(*duckdb_selection_vector_get_data_ptr)(duckdb_selection_vector sel); void (*duckdb_vector_copy_sel)(duckdb_vector src, duckdb_vector dst, duckdb_selection_vector sel, idx_t src_count, idx_t src_offset, idx_t dst_offset); + duckdb_error_data (*duckdb_vector_safe_assign_string_element)(duckdb_vector vector, idx_t index, const char *str); #endif } duckdb_ext_api_v1; @@ -1366,15 +1367,16 @@ typedef struct { #define duckdb_create_union_value duckdb_ext_api.duckdb_create_union_value // Version unstable_new_vector_functions -#define duckdb_create_vector duckdb_ext_api.duckdb_create_vector -#define duckdb_destroy_vector duckdb_ext_api.duckdb_destroy_vector -#define duckdb_slice_vector duckdb_ext_api.duckdb_slice_vector -#define duckdb_vector_copy_sel duckdb_ext_api.duckdb_vector_copy_sel -#define duckdb_vector_reference_value duckdb_ext_api.duckdb_vector_reference_value -#define duckdb_vector_reference_vector duckdb_ext_api.duckdb_vector_reference_vector -#define duckdb_create_selection_vector duckdb_ext_api.duckdb_create_selection_vector -#define duckdb_destroy_selection_vector duckdb_ext_api.duckdb_destroy_selection_vector -#define duckdb_selection_vector_get_data_ptr duckdb_ext_api.duckdb_selection_vector_get_data_ptr +#define duckdb_create_vector duckdb_ext_api.duckdb_create_vector +#define duckdb_destroy_vector duckdb_ext_api.duckdb_destroy_vector +#define duckdb_vector_safe_assign_string_element duckdb_ext_api.duckdb_vector_safe_assign_string_element +#define duckdb_slice_vector duckdb_ext_api.duckdb_slice_vector +#define duckdb_vector_copy_sel duckdb_ext_api.duckdb_vector_copy_sel +#define duckdb_vector_reference_value duckdb_ext_api.duckdb_vector_reference_value +#define duckdb_vector_reference_vector duckdb_ext_api.duckdb_vector_reference_vector +#define duckdb_create_selection_vector duckdb_ext_api.duckdb_create_selection_vector +#define duckdb_destroy_selection_vector duckdb_ext_api.duckdb_destroy_selection_vector +#define duckdb_selection_vector_get_data_ptr duckdb_ext_api.duckdb_selection_vector_get_data_ptr //===--------------------------------------------------------------------===// // Struct Global Macros diff --git a/src/duckdb/src/main/capi/data_chunk-c.cpp b/src/duckdb/src/main/capi/data_chunk-c.cpp index 77f6482ab..9183858ca 100644 --- a/src/duckdb/src/main/capi/data_chunk-c.cpp +++ b/src/duckdb/src/main/capi/data_chunk-c.cpp @@ -1,7 +1,8 @@ +#include "duckdb/common/type_visitor.hpp" #include "duckdb/common/types/data_chunk.hpp" #include "duckdb/common/types/string_type.hpp" #include "duckdb/main/capi/capi_internal.hpp" -#include "duckdb/common/type_visitor.hpp" +#include "utf8proc_wrapper.hpp" #include @@ -136,6 +137,29 @@ void duckdb_vector_ensure_validity_writable(duckdb_vector vector) { validity.EnsureWritable(); } +duckdb_error_data duckdb_vector_safe_assign_string_element(duckdb_vector vector, idx_t index, const char *str) { + if (!vector) { + return nullptr; + } + + auto v = reinterpret_cast(vector); + idx_t str_len = strlen(str); + + // UTF-8 analysis for VARCHAR vectors, which expect valid UTF-8. + if (v->GetType().id() == duckdb::LogicalTypeId::VARCHAR) { + duckdb::UnicodeInvalidReason reason; + size_t pos; + auto utf_type = duckdb::Utf8Proc::Analyze(str, str_len, &reason, &pos); + if (utf_type == duckdb::UnicodeType::INVALID) { + return duckdb_create_error_data(DUCKDB_ERROR_INVALID_INPUT, + "invalid Unicode detected, str must be valid UTF-8"); + } + } + auto data = duckdb::FlatVector::GetData(*v); + data[index] = duckdb::StringVector::AddStringOrBlob(*v, str, str_len); + return nullptr; +} + void duckdb_vector_assign_string_element(duckdb_vector vector, idx_t index, const char *str) { duckdb_vector_assign_string_element_len(vector, index, str, strlen(str)); } diff --git a/src/duckdb/src/optimizer/window_self_join.cpp b/src/duckdb/src/optimizer/window_self_join.cpp index b7b171757..a3879b27e 100644 --- a/src/duckdb/src/optimizer/window_self_join.cpp +++ b/src/duckdb/src/optimizer/window_self_join.cpp @@ -1,14 +1,11 @@ #include "duckdb/optimizer/window_self_join.hpp" #include "duckdb/optimizer/optimizer.hpp" #include "duckdb/planner/binder.hpp" -#include "duckdb/planner/operator/logical_filter.hpp" #include "duckdb/planner/operator/logical_window.hpp" #include "duckdb/planner/operator/logical_comparison_join.hpp" #include "duckdb/planner/operator/logical_aggregate.hpp" #include "duckdb/planner/operator/logical_projection.hpp" #include "duckdb/planner/expression/bound_window_expression.hpp" -#include "duckdb/planner/expression/bound_comparison_expression.hpp" -#include "duckdb/planner/expression/bound_constant_expression.hpp" #include "duckdb/planner/expression/bound_columnref_expression.hpp" #include "duckdb/planner/expression/bound_aggregate_expression.hpp" #include "duckdb/function/aggregate_function.hpp" @@ -82,158 +79,110 @@ unique_ptr WindowSelfJoinOptimizer::Optimize(unique_ptr WindowSelfJoinOptimizer::OptimizeInternal(unique_ptr op, ColumnBindingReplacer &replacer) { - if (op->type == LogicalOperatorType::LOGICAL_FILTER) { - auto &filter = op->Cast(); - if (filter.expressions.size() == 1 && filter.children.size() == 1 && - filter.children[0]->type == LogicalOperatorType::LOGICAL_WINDOW) { - auto &window = filter.children[0]->Cast(); + if (op->type == LogicalOperatorType::LOGICAL_WINDOW) { + auto &window = op->Cast(); - // Check recursively - window.children[0] = OptimizeInternal(std::move(window.children[0]), replacer); + // Check recursively + window.children[0] = OptimizeInternal(std::move(window.children[0]), replacer); - if (window.expressions.size() != 1) { - return op; - } - if (window.expressions[0]->type != ExpressionType::WINDOW_AGGREGATE) { - return op; - } + if (window.expressions.size() != 1) { + return op; + } + if (window.expressions[0]->type != ExpressionType::WINDOW_AGGREGATE) { + return op; + } - // We can only optimize if there is a single window function equality comparison - // Check matches - if (filter.expressions[0]->type != ExpressionType::COMPARE_EQUAL) { - return op; - } - auto &comp = filter.expressions[0]->Cast(); - if (comp.left->type != ExpressionType::BOUND_COLUMN_REF) { - return op; - } - auto &col_ref = comp.left->Cast(); - auto t_idx = col_ref.binding.table_index; - auto c_idx = col_ref.binding.column_index; - auto w_idx = window.window_index; + auto &w_expr = window.expressions[0]->Cast(); + if (w_expr.aggregate->name != "count" && w_expr.aggregate->name != "count_star") { + return op; + } + if (!w_expr.orders.empty()) { + return op; + } + if (w_expr.partitions.empty()) { + return op; + } - if (t_idx != w_idx || c_idx != 0) { - return op; - } + // --- Transformation --- - // Check right side is constant 1 - if (comp.right->type != ExpressionType::VALUE_CONSTANT) { - return op; - } - auto &const_expr = comp.right->Cast(); - if (!const_expr.value.type().IsIntegral()) { - return op; - } - if (const_expr.value.GetValue() != 1) { - return op; - } + auto original_child = std::move(window.children[0]); + auto copy_child = original_child->Copy(optimizer.context); - auto &w_expr = window.expressions[0]->Cast(); - if (w_expr.aggregate->name != "count" && w_expr.aggregate->name != "count_star") { - return op; - } - if (!w_expr.orders.empty()) { - return op; - } - if (w_expr.partitions.empty()) { - return op; - } - - // --- Transformation --- + // Rebind copy_child to avoid duplicate table indices + WindowSelfJoinTableRebinder rebinder(optimizer); + rebinder.VisitOperator(*copy_child); - auto original_child = std::move(window.children[0]); - auto copy_child = original_child->Copy(optimizer.context); + auto aggregate_index = optimizer.binder.GenerateTableIndex(); + auto group_index = optimizer.binder.GenerateTableIndex(); - // Rebind copy_child to avoid duplicate table indices - WindowSelfJoinTableRebinder rebinder(optimizer); - rebinder.VisitOperator(*copy_child); + vector> groups; + vector> aggregates; - auto aggregate_index = optimizer.binder.GenerateTableIndex(); - auto group_index = optimizer.binder.GenerateTableIndex(); - - vector> groups; - vector> aggregates; + // Create Aggregate Operator + for (auto &part : w_expr.partitions) { + auto part_copy = part->Copy(); + rebinder.VisitExpression(&part_copy); // Update bindings + groups.push_back(std::move(part_copy)); + } - // Create Aggregate Operator - for (auto &part : w_expr.partitions) { - auto part_copy = part->Copy(); - rebinder.VisitExpression(&part_copy); // Update bindings - groups.push_back(std::move(part_copy)); - } + auto count_func = *w_expr.aggregate; + unique_ptr bind_info; + if (w_expr.bind_info) { + bind_info = w_expr.bind_info->Copy(); + } else { + bind_info = nullptr; + } - auto count_func = *w_expr.aggregate; - unique_ptr bind_info; - if (w_expr.bind_info) { - bind_info = w_expr.bind_info->Copy(); - } else { - bind_info = nullptr; - } + vector> children; + for (auto &child : w_expr.children) { + auto child_copy = child->Copy(); + rebinder.VisitExpression(&child_copy); // Update bindings + children.push_back(std::move(child_copy)); + } - vector> children; - for (auto &child : w_expr.children) { - auto child_copy = child->Copy(); - rebinder.VisitExpression(&child_copy); // Update bindings - children.push_back(std::move(child_copy)); - } + auto aggr_type = w_expr.distinct ? AggregateType::DISTINCT : AggregateType::NON_DISTINCT; - auto aggr_type = w_expr.distinct ? AggregateType::DISTINCT : AggregateType::NON_DISTINCT; + auto agg_expr = make_uniq(std::move(count_func), std::move(children), nullptr, + std::move(bind_info), aggr_type); - auto agg_expr = make_uniq(std::move(count_func), std::move(children), nullptr, - std::move(bind_info), aggr_type); + aggregates.push_back(std::move(agg_expr)); - aggregates.push_back(std::move(agg_expr)); + // args: group_index, aggregate_index, ... + auto agg_op = make_uniq(group_index, aggregate_index, std::move(aggregates)); - // args: group_index, aggregate_index, ... - auto agg_op = make_uniq(group_index, aggregate_index, std::move(aggregates)); + agg_op->groups = std::move(groups); + agg_op->children.push_back(std::move(copy_child)); + agg_op->ResolveOperatorTypes(); - agg_op->groups = std::move(groups); - agg_op->children.push_back(std::move(copy_child)); - agg_op->ResolveOperatorTypes(); + if (agg_op->types.size() <= agg_op->groups.size()) { + throw InternalException("LogicalAggregate types size mismatch"); + } - if (agg_op->types.size() <= agg_op->groups.size()) { - throw InternalException("LogicalAggregate types size mismatch"); - } + // Inner Join on the partition keys + auto join = make_uniq(JoinType::INNER); - // Filter on aggregate: count = 1 - // Count is the first aggregate, so it's at agg_op->groups.size() in the types list - // Bindings: Aggregates are at aggregate_index - ColumnBinding new_binding(aggregate_index, 0); - auto cnt_ref = make_uniq(agg_op->types[agg_op->groups.size()], new_binding); - - auto filter_expr = - make_uniq(ExpressionType::COMPARE_EQUAL, std::move(cnt_ref), - make_uniq(Value::BIGINT(1))); - - auto rhs_filter = make_uniq(); - rhs_filter->expressions.push_back(std::move(filter_expr)); - rhs_filter->children.push_back(std::move(agg_op)); - rhs_filter->ResolveOperatorTypes(); - - // Inner Join on the partition keys - auto join = make_uniq(JoinType::INNER); - - for (size_t i = 0; i < w_expr.partitions.size(); ++i) { - JoinCondition cond; - cond.comparison = ExpressionType::COMPARE_NOT_DISTINCT_FROM; - cond.left = w_expr.partitions[i]->Copy(); - cond.right = make_uniq(w_expr.partitions[i]->return_type, - ColumnBinding(group_index, i)); - join->conditions.push_back(std::move(cond)); - } + for (size_t i = 0; i < w_expr.partitions.size(); ++i) { + JoinCondition cond; + cond.comparison = ExpressionType::COMPARE_NOT_DISTINCT_FROM; + cond.left = w_expr.partitions[i]->Copy(); + cond.right = + make_uniq(w_expr.partitions[i]->return_type, ColumnBinding(group_index, i)); + join->conditions.push_back(std::move(cond)); + } - join->children.push_back(std::move(original_child)); - join->children.push_back(std::move(rhs_filter)); - join->ResolveOperatorTypes(); + join->children.push_back(std::move(original_child)); + join->children.push_back(std::move(agg_op)); + join->ResolveOperatorTypes(); - // Replace Count binding - // Old window column: (window.window_index, 0) - // New constant column: (aggregate_index, 0) - ColumnBinding old_binding(window.window_index, 0); + // Replace Count binding + // Old window column: (window.window_index, 0) + // New constant column: (aggregate_index, 0) + ColumnBinding old_binding(window.window_index, 0); + ColumnBinding new_binding(aggregate_index, 0); - replacer.replacement_bindings.emplace_back(old_binding, new_binding); + replacer.replacement_bindings.emplace_back(old_binding, new_binding); - return std::move(join); - } + return std::move(join); } else if (!op->children.empty()) { for (auto &child : op->children) { child = OptimizeInternal(std::move(child), replacer); diff --git a/src/duckdb/src/storage/caching_file_system.cpp b/src/duckdb/src/storage/caching_file_system.cpp index d98eeb42c..1d979377e 100644 --- a/src/duckdb/src/storage/caching_file_system.cpp +++ b/src/duckdb/src/storage/caching_file_system.cpp @@ -59,21 +59,23 @@ CachingFileSystem CachingFileSystem::Get(ClientContext &context) { return CachingFileSystem(FileSystem::GetFileSystem(context), *context.db); } -unique_ptr CachingFileSystem::OpenFile(const OpenFileInfo &path, FileOpenFlags flags) { - return make_uniq(QueryContext(), *this, path, flags, +unique_ptr CachingFileSystem::OpenFile(const OpenFileInfo &path, FileOpenFlags flags, + optional_ptr opener) { + return make_uniq(QueryContext(), *this, path, flags, opener, external_file_cache.GetOrCreateCachedFile(path.path)); } unique_ptr CachingFileSystem::OpenFile(QueryContext context, const OpenFileInfo &path, - FileOpenFlags flags) { - return make_uniq(context, *this, path, flags, + FileOpenFlags flags, optional_ptr opener) { + return make_uniq(context, *this, path, flags, opener, external_file_cache.GetOrCreateCachedFile(path.path)); } CachingFileHandle::CachingFileHandle(QueryContext context, CachingFileSystem &caching_file_system_p, - const OpenFileInfo &path_p, FileOpenFlags flags_p, CachedFile &cached_file_p) + const OpenFileInfo &path_p, FileOpenFlags flags_p, + optional_ptr opener_p, CachedFile &cached_file_p) : context(context), caching_file_system(caching_file_system_p), - external_file_cache(caching_file_system.external_file_cache), path(path_p), flags(flags_p), + external_file_cache(caching_file_system.external_file_cache), path(path_p), flags(flags_p), opener(opener_p), validate( ExternalFileCacheUtil::GetCacheValidationMode(path_p, context.GetClientContext(), caching_file_system_p.db)), cached_file(cached_file_p), position(0) { @@ -95,7 +97,7 @@ CachingFileHandle::~CachingFileHandle() { FileHandle &CachingFileHandle::GetFileHandle() { if (!file_handle) { - file_handle = caching_file_system.file_system.OpenFile(path, flags); + file_handle = caching_file_system.file_system.OpenFile(path, flags, opener); last_modified = caching_file_system.file_system.GetLastModifiedTime(*file_handle); version_tag = caching_file_system.file_system.GetVersionTag(*file_handle); diff --git a/src/duckdb/src/storage/caching_file_system_wrapper.cpp b/src/duckdb/src/storage/caching_file_system_wrapper.cpp index 875551389..a6c8278b4 100644 --- a/src/duckdb/src/storage/caching_file_system_wrapper.cpp +++ b/src/duckdb/src/storage/caching_file_system_wrapper.cpp @@ -137,7 +137,7 @@ unique_ptr CachingFileSystemWrapper::OpenFileExtended(const OpenFile } if (ShouldUseCache(path.path)) { - auto caching_handle = caching_file_system.OpenFile(path, flags); + auto caching_handle = caching_file_system.OpenFile(path, flags, opener); return make_uniq(shared_from_this(), std::move(caching_handle), flags); } // Bypass cache, use underlying file system directly.