Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 88 additions & 40 deletions src/duckdb/src/common/adbc/adbc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ struct DuckDBAdbcStatementWrapper {
duckdb_connection connection;
duckdb_prepared_statement statement;
char *ingestion_table_name;
char *target_catalog;
char *db_schema;
ArrowArrayStream ingestion_stream;
IngestionMode ingestion_mode = IngestionMode::CREATE;
Expand Down Expand Up @@ -725,16 +726,36 @@ void stream_schema(ArrowArrayStream *stream, ArrowSchema &schema) {
}

// Helper function to build CREATE TABLE SQL statement
static std::string BuildCreateTableSQL(const char *schema, const char *table_name,
static std::string BuildCreateTableSQL(const char *catalog, const char *schema, const char *table_name,
const duckdb::vector<duckdb::LogicalType> &types,
const duckdb::vector<std::string> &names, bool if_not_exists = false) {
const duckdb::vector<std::string> &names, bool if_not_exists = false,
bool temporary = false, bool replace = false) {
std::ostringstream create_table;
create_table << "CREATE TABLE ";
if (replace) {
create_table << "CREATE OR REPLACE ";
} else {
create_table << "CREATE ";
}
if (temporary) {
create_table << "TEMP ";
}
create_table << "TABLE ";
if (if_not_exists) {
create_table << "IF NOT EXISTS ";
}
if (schema) {
create_table << duckdb::KeywordHelper::WriteOptionallyQuoted(schema) << ".";
// Note: DuckDB resolves two-part names as either catalog.table (default schema)
// or schema.table depending on context. This can become ambiguous if a schema and
// an attached catalog share a name. Callers should prefer passing an explicit
// schema (or defaulting to "main") to produce an unambiguous three-part name.
// For TEMP tables, specifying catalog/schema in the CREATE statement is not allowed;
// the table is automatically placed in the temp catalog.
if (!temporary) {
if (catalog) {
create_table << duckdb::KeywordHelper::WriteOptionallyQuoted(catalog) << ".";
}
if (schema) {
create_table << duckdb::KeywordHelper::WriteOptionallyQuoted(schema) << ".";
}
}
create_table << duckdb::KeywordHelper::WriteOptionallyQuoted(table_name) << " (";
for (idx_t i = 0; i < types.size(); i++) {
Expand All @@ -748,18 +769,7 @@ static std::string BuildCreateTableSQL(const char *schema, const char *table_nam
return create_table.str();
}

// Helper function to build DROP TABLE IF EXISTS SQL statement
static std::string BuildDropTableSQL(const char *schema, const char *table_name) {
std::ostringstream drop_table;
drop_table << "DROP TABLE IF EXISTS ";
if (schema) {
drop_table << duckdb::KeywordHelper::WriteOptionallyQuoted(schema) << ".";
}
drop_table << duckdb::KeywordHelper::WriteOptionallyQuoted(table_name);
return drop_table.str();
}

AdbcStatusCode Ingest(duckdb_connection connection, const char *table_name, const char *schema,
AdbcStatusCode Ingest(duckdb_connection connection, const char *catalog, const char *table_name, const char *schema,
struct ArrowArrayStream *input, struct AdbcError *error, IngestionMode ingestion_mode,
bool temporary, int64_t *rows_affected) {
if (!connection) {
Expand All @@ -775,10 +785,31 @@ AdbcStatusCode Ingest(duckdb_connection connection, const char *table_name, cons
return ADBC_STATUS_INVALID_ARGUMENT;
}
if (schema && temporary) {
// Temporary option is not supported with ADBC_INGEST_OPTION_TARGET_DB_SCHEMA or
// ADBC_INGEST_OPTION_TARGET_CATALOG
// Temporary option is not supported with ADBC_INGEST_OPTION_TARGET_DB_SCHEMA
SetError(error, "Temporary option is not supported with schema");
return ADBC_STATUS_INVALID_ARGUMENT;
return ADBC_STATUS_INVALID_STATE;
}
if (catalog && temporary) {
// Temporary option is not supported with ADBC_INGEST_OPTION_TARGET_CATALOG
SetError(error, "Temporary option is not supported with catalog");
return ADBC_STATUS_INVALID_STATE;
}

// Resolve target name parts.
// Used for both SQL generation (CREATE/DROP) and appender lookup.
// Prefer explicit three-part names; two-part names can be ambiguous.
const char *effective_catalog = catalog;
const char *effective_schema = schema;
if (temporary) {
// Temporary tables live in the special "temp" catalog.
// "CREATE TEMP TABLE" automatically places tables in temp.main.
// For the appender, we need to explicitly target the temp catalog.
effective_catalog = "temp";
effective_schema = nullptr;
} else if (catalog && !schema) {
// Default schema for attached catalogs (DEFAULT_SCHEMA).
// Use catalog.main.table to avoid catalog/schema name ambiguity.
effective_schema = "main";
}

duckdb::ArrowSchemaWrapper arrow_schema_wrapper;
Expand All @@ -800,7 +831,7 @@ AdbcStatusCode Ingest(duckdb_connection connection, const char *table_name, cons
switch (ingestion_mode) {
case IngestionMode::CREATE: {
// CREATE mode: Create table, error if already exists
auto sql = BuildCreateTableSQL(schema, table_name, types, names);
auto sql = BuildCreateTableSQL(effective_catalog, effective_schema, table_name, types, names, false, temporary);
duckdb_result result;
if (duckdb_query(connection, sql.c_str(), &result) == DuckDBError) {
const char *error_msg = duckdb_result_error(&result);
Expand All @@ -820,16 +851,10 @@ AdbcStatusCode Ingest(duckdb_connection connection, const char *table_name, cons
// The appender will naturally fail if the table doesn't exist
break;
case IngestionMode::REPLACE: {
// REPLACE mode: Drop table if exists, then create
auto drop_sql = BuildDropTableSQL(schema, table_name);
auto create_sql = BuildCreateTableSQL(schema, table_name, types, names);
// REPLACE mode: CREATE OR REPLACE TABLE
auto create_sql =
BuildCreateTableSQL(effective_catalog, effective_schema, table_name, types, names, false, temporary, true);
duckdb_result result;
if (duckdb_query(connection, drop_sql.c_str(), &result) == DuckDBError) {
SetError(error, duckdb_result_error(&result));
duckdb_destroy_result(&result);
return ADBC_STATUS_INTERNAL;
}
duckdb_destroy_result(&result);
if (duckdb_query(connection, create_sql.c_str(), &result) == DuckDBError) {
SetError(error, duckdb_result_error(&result));
duckdb_destroy_result(&result);
Expand All @@ -840,7 +865,7 @@ AdbcStatusCode Ingest(duckdb_connection connection, const char *table_name, cons
}
case IngestionMode::CREATE_APPEND: {
// CREATE_APPEND mode: Create if not exists, append if exists
auto sql = BuildCreateTableSQL(schema, table_name, types, names, true);
auto sql = BuildCreateTableSQL(effective_catalog, effective_schema, table_name, types, names, true, temporary);
duckdb_result result;
if (duckdb_query(connection, sql.c_str(), &result) == DuckDBError) {
SetError(error, duckdb_result_error(&result));
Expand All @@ -851,7 +876,7 @@ AdbcStatusCode Ingest(duckdb_connection connection, const char *table_name, cons
break;
}
}
AppenderWrapper appender(connection, schema, table_name);
AppenderWrapper appender(connection, effective_catalog, effective_schema, table_name);
if (!appender.Valid()) {
return ADBC_STATUS_INTERNAL;
}
Expand Down Expand Up @@ -919,6 +944,7 @@ AdbcStatusCode StatementNew(struct AdbcConnection *connection, struct AdbcStatem
statement_wrapper->statement = nullptr;
statement_wrapper->ingestion_stream.release = nullptr;
statement_wrapper->ingestion_table_name = nullptr;
statement_wrapper->target_catalog = nullptr;
statement_wrapper->db_schema = nullptr;
statement_wrapper->temporary_table = false;

Expand All @@ -943,6 +969,10 @@ AdbcStatusCode StatementRelease(struct AdbcStatement *statement, struct AdbcErro
free(wrapper->ingestion_table_name);
wrapper->ingestion_table_name = nullptr;
}
if (wrapper->target_catalog) {
free(wrapper->target_catalog);
wrapper->target_catalog = nullptr;
}
if (wrapper->db_schema) {
free(wrapper->db_schema);
wrapper->db_schema = nullptr;
Expand Down Expand Up @@ -1019,8 +1049,9 @@ static AdbcStatusCode IngestToTableFromBoundStream(DuckDBAdbcStatementWrapper *s
auto stream = statement->ingestion_stream;

// Ingest into a table from the bound stream
return Ingest(statement->connection, statement->ingestion_table_name, statement->db_schema, &stream, error,
statement->ingestion_mode, statement->temporary_table, rows_affected);
return Ingest(statement->connection, statement->target_catalog, statement->ingestion_table_name,
statement->db_schema, &stream, error, statement->ingestion_mode, statement->temporary_table,
rows_affected);
}

AdbcStatusCode StatementExecuteQuery(struct AdbcStatement *statement, struct ArrowArrayStream *out,
Expand Down Expand Up @@ -1354,15 +1385,25 @@ AdbcStatusCode StatementSetOption(struct AdbcStatement *statement, const char *k
auto wrapper = static_cast<DuckDBAdbcStatementWrapper *>(statement->private_data);

if (strcmp(key, ADBC_INGEST_OPTION_TARGET_TABLE) == 0) {
if (wrapper->ingestion_table_name) {
free(wrapper->ingestion_table_name);
}
wrapper->ingestion_table_name = strdup(value);
wrapper->temporary_table = false;
return ADBC_STATUS_OK;
}
if (strcmp(key, ADBC_INGEST_OPTION_TEMPORARY) == 0) {
if (strcmp(value, ADBC_OPTION_VALUE_ENABLED) == 0) {
// Align with arrow-adbc PostgreSQL driver behavior: if a schema was set
// before enabling temporary ingestion, clear it so temporary can proceed.
// (Some clients set schema by default.)
if (wrapper->db_schema) {
SetError(error, "Temporary option is not supported with schema");
return ADBC_STATUS_INVALID_ARGUMENT;
free(wrapper->db_schema);
wrapper->db_schema = nullptr;
}
// Some clients may also set a catalog by default; clear it so temporary can proceed.
if (wrapper->target_catalog) {
free(wrapper->target_catalog);
wrapper->target_catalog = nullptr;
}
wrapper->temporary_table = true;
return ADBC_STATUS_OK;
Expand All @@ -1378,14 +1419,21 @@ AdbcStatusCode StatementSetOption(struct AdbcStatement *statement, const char *k
}

if (strcmp(key, ADBC_INGEST_OPTION_TARGET_DB_SCHEMA) == 0) {
if (wrapper->temporary_table) {
SetError(error, "Temporary option is not supported with schema");
return ADBC_STATUS_INVALID_ARGUMENT;
if (wrapper->db_schema) {
free(wrapper->db_schema);
}
wrapper->db_schema = strdup(value);
return ADBC_STATUS_OK;
}

if (strcmp(key, ADBC_INGEST_OPTION_TARGET_CATALOG) == 0) {
if (wrapper->target_catalog) {
free(wrapper->target_catalog);
}
wrapper->target_catalog = strdup(value);
return ADBC_STATUS_OK;
}

if (strcmp(key, ADBC_INGEST_OPTION_MODE) == 0) {
if (strcmp(value, ADBC_INGEST_OPTION_MODE_CREATE) == 0) {
wrapper->ingestion_mode = IngestionMode::CREATE;
Expand Down
7 changes: 7 additions & 0 deletions src/duckdb/src/execution/physical_plan_generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,4 +185,11 @@ PhysicalOperator &PhysicalPlanGenerator::CreatePlan(LogicalOperator &op) {
throw InternalException("Physical plan generator - no plan generated");
}

ArenaAllocator &PhysicalPlanGenerator::ArenaRef() {
if (!physical_plan) {
physical_plan = make_uniq<PhysicalPlan>(Allocator::Get(context));
}
return physical_plan->ArenaRef();
}

} // namespace duckdb
6 changes: 3 additions & 3 deletions src/duckdb/src/function/table/version/pragma_version.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef DUCKDB_PATCH_VERSION
#define DUCKDB_PATCH_VERSION "0-dev5446"
#define DUCKDB_PATCH_VERSION "0-dev5469"
#endif
#ifndef DUCKDB_MINOR_VERSION
#define DUCKDB_MINOR_VERSION 5
Expand All @@ -8,10 +8,10 @@
#define DUCKDB_MAJOR_VERSION 1
#endif
#ifndef DUCKDB_VERSION
#define DUCKDB_VERSION "v1.5.0-dev5446"
#define DUCKDB_VERSION "v1.5.0-dev5469"
#endif
#ifndef DUCKDB_SOURCE_ID
#define DUCKDB_SOURCE_ID "13e1f2229c"
#define DUCKDB_SOURCE_ID "f5174f0d6d"
#endif
#include "duckdb/function/table/system_functions.hpp"
#include "duckdb/main/database.hpp"
Expand Down
14 changes: 14 additions & 0 deletions src/duckdb/src/include/duckdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -3456,6 +3456,20 @@ This allows NULL values to be written to the vector, regardless of whether a val
*/
DUCKDB_C_API void duckdb_vector_ensure_validity_writable(duckdb_vector vector);

/*!
Safely assigns a string element in the vector at the specified location. Supersedes
`duckdb_vector_assign_string_element`. The vector type must be VARCHAR and the input must be valid UTF-8. Otherwise, it
returns an invalid Unicode error.

* @param vector The vector to alter
* @param index The row position in the vector to assign the string to
* @param str The null-terminated string
* @return If valid UTF-8, then `nullptr`, else error information. If not `nullptr`, then the return value must be
destroyed with `duckdb_destroy_error_data`.
*/
DUCKDB_C_API duckdb_error_data duckdb_vector_safe_assign_string_element(duckdb_vector vector, idx_t index,
const char *str);

/*!
Assigns a string element in the vector at the specified location.

Expand Down
14 changes: 11 additions & 3 deletions src/duckdb/src/include/duckdb/common/adbc/adbc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,18 @@ namespace duckdb_adbc {

class AppenderWrapper {
public:
AppenderWrapper(duckdb_connection conn, const char *schema, const char *table) : appender(nullptr) {
if (duckdb_appender_create(conn, schema, table, &appender) != DuckDBSuccess) {
appender = nullptr;
AppenderWrapper(duckdb_connection conn, const char *catalog, const char *schema, const char *table)
: appender(nullptr) {
// Note: duckdb_appender_create_ext allocates an internal wrapper even on failure.
// If creation fails, make sure to destroy it to avoid leaking.
auto created = duckdb_appender(nullptr);
if (duckdb_appender_create_ext(conn, catalog, schema, table, &created) != DuckDBSuccess) {
if (created) {
duckdb_appender_destroy(&created);
}
return;
}
appender = created;
}
~AppenderWrapper() {
if (appender) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ class PhysicalPlanGenerator {
return physical_plan->Make<T>(std::forward<ARGS>(args)...);
}

//! Get a reference to the ArenaAllocator of the underlying physical plan.
//! Creates a new (empty) physical plan if none exists yet.
ArenaAllocator &ArenaRef();

public:
PhysicalOperator &ResolveDefaultsProjection(LogicalInsert &op, PhysicalOperator &child);

Expand Down
2 changes: 2 additions & 0 deletions src/duckdb/src/include/duckdb/main/capi/extension_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ typedef struct {
sel_t *(*duckdb_selection_vector_get_data_ptr)(duckdb_selection_vector sel);
void (*duckdb_vector_copy_sel)(duckdb_vector src, duckdb_vector dst, duckdb_selection_vector sel, idx_t src_count,
idx_t src_offset, idx_t dst_offset);
duckdb_error_data (*duckdb_vector_safe_assign_string_element)(duckdb_vector vector, idx_t index, const char *str);
} duckdb_ext_api_v1;

//===--------------------------------------------------------------------===//
Expand Down Expand Up @@ -1221,6 +1222,7 @@ inline duckdb_ext_api_v1 CreateAPIv1() {
result.duckdb_destroy_selection_vector = duckdb_destroy_selection_vector;
result.duckdb_selection_vector_get_data_ptr = duckdb_selection_vector_get_data_ptr;
result.duckdb_vector_copy_sel = duckdb_vector_copy_sel;
result.duckdb_vector_safe_assign_string_element = duckdb_vector_safe_assign_string_element;
return result;
}

Expand Down
10 changes: 7 additions & 3 deletions src/duckdb/src/include/duckdb/storage/caching_file_system.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#pragma once

#include "duckdb/common/enums/cache_validation_mode.hpp"
#include "duckdb/common/file_opener.hpp"
#include "duckdb/common/file_open_flags.hpp"
#include "duckdb/common/open_file_info.hpp"
#include "duckdb/common/winapi.hpp"
Expand All @@ -35,7 +36,7 @@ struct CachingFileHandle {

public:
DUCKDB_API CachingFileHandle(QueryContext context, CachingFileSystem &caching_file_system, const OpenFileInfo &path,
FileOpenFlags flags, CachedFile &cached_file);
FileOpenFlags flags, optional_ptr<FileOpener> opener, CachedFile &cached_file);
DUCKDB_API ~CachingFileHandle();

public:
Expand Down Expand Up @@ -89,6 +90,8 @@ struct CachingFileHandle {
OpenFileInfo path;
//! Flags used to open the file
FileOpenFlags flags;
//! File opener, which contains file open context.
optional_ptr<FileOpener> opener;
//! Cache validation mode for this file
CacheValidationMode validate;
//! The associated CachedFile with cached ranges
Expand Down Expand Up @@ -119,9 +122,10 @@ class CachingFileSystem {
public:
DUCKDB_API static CachingFileSystem Get(ClientContext &context);

DUCKDB_API unique_ptr<CachingFileHandle> OpenFile(const OpenFileInfo &path, FileOpenFlags flags);
DUCKDB_API unique_ptr<CachingFileHandle> OpenFile(const OpenFileInfo &path, FileOpenFlags flags,
optional_ptr<FileOpener> opener = nullptr);
DUCKDB_API unique_ptr<CachingFileHandle> OpenFile(QueryContext context, const OpenFileInfo &path,
FileOpenFlags flags);
FileOpenFlags flags, optional_ptr<FileOpener> opener = nullptr);

private:
//! The Client FileSystem (needs to be client-specific so we can do, e.g., HTTPFS profiling)
Expand Down
Loading