Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions chdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
import os


_arrow_format = set({"dataframe", "arrowtable"})
_process_result_format_funs = {
"dataframe" : lambda x : to_df(x),
"arrowtable": lambda x : to_arrowTable(x)
}

# If any UDF is defined, the path of the UDF will be set to this variable
# and the path will be deleted when the process exits
# UDF config path will be f"{g_udf_path}/udf_config.xml"
Expand Down Expand Up @@ -60,9 +66,10 @@ def query(sql, output_format="CSV", path="", udf_path=""):
if udf_path != "":
g_udf_path = udf_path
lower_output_format = output_format.lower()
if lower_output_format == "dataframe":
return to_df(_chdb.query(sql, "Arrow", path=path, udf_path=g_udf_path))
elif lower_output_format == "arrowtable":
return to_arrowTable(_chdb.query(sql, "Arrow", path=path, udf_path=g_udf_path))
else:
return _chdb.query(sql, output_format, path=path, udf_path=g_udf_path)
result_func = _process_result_format_funs.get(lower_output_format, lambda x : x)
if lower_output_format in _arrow_format:
output_format = "Arrow"
res = _chdb.query(sql, output_format, path=path, udf_path=g_udf_path)
if res.has_error():
raise Exception(res.error_message())
return result_func(res)
5 changes: 4 additions & 1 deletion chdb/dbapi/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ def _execute_command(self, sql):
if DEBUG:
print("DEBUG: query:", sql)
try:
self._resp = self._session.query(sql, fmt="JSON").data()
res = self._session.query(sql, output_format="JSON")
if res.has_error():
raise err.DatabaseError(res.error_message())
self._resp = res.data()
except Exception as error:
raise err.InterfaceError("query err: %s" % error)

Expand Down
10 changes: 6 additions & 4 deletions programs/local/LocalChdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
extern bool inside_main = true;


local_result * queryToBuffer(
local_result_v2 * queryToBuffer(
const std::string & queryStr,
const std::string & output_format = "CSV",
const std::string & path = {},
Expand Down Expand Up @@ -51,7 +51,7 @@ local_result * queryToBuffer(
for (auto & arg : argv)
argv_char.push_back(const_cast<char *>(arg.c_str()));

return query_stable(argv_char.size(), argv_char.data());
return query_stable_v2(argv_char.size(), argv_char.data());
}

// Pybind11 will take over the ownership of the `query_result` object
Expand Down Expand Up @@ -132,7 +132,7 @@ PYBIND11_MODULE(_chdb, m)
.def("view", &memoryview_wrapper::view);

py::class_<query_result>(m, "query_result")
.def(py::init<local_result *>(), py::return_value_policy::take_ownership)
.def(py::init<local_result_v2 *>(), py::return_value_policy::take_ownership)
.def("data", &query_result::data)
.def("bytes", &query_result::bytes)
.def("__str__", &query_result::str)
Expand All @@ -142,7 +142,9 @@ PYBIND11_MODULE(_chdb, m)
.def("rows_read", &query_result::rows_read)
.def("bytes_read", &query_result::bytes_read)
.def("elapsed", &query_result::elapsed)
.def("get_memview", &query_result::get_memview);
.def("get_memview", &query_result::get_memview)
.def("has_error", &query_result::has_error)
.def("error_message", &query_result::error_message);


m.def(
Expand Down
26 changes: 22 additions & 4 deletions programs/local/LocalChdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ class __attribute__((visibility("default"))) query_result;
class local_result_wrapper
{
private:
local_result * result;
local_result_v2 * result;

public:
local_result_wrapper(local_result * result) : result(result) { }
local_result_wrapper(local_result_v2 * result) : result(result) { }
~local_result_wrapper()
{
free_result(result);
free_result_v2(result);
delete result;
}
char * data()
Expand Down Expand Up @@ -81,6 +81,22 @@ class local_result_wrapper
}
return result->elapsed;
}
bool has_error()
{
if (result == nullptr)
{
return false;
}
return result->error_message != nullptr;
}
py::str error_message()
{
if (has_error())
{
return py::str(result->error_message);
}
return py::str();
}
};

class query_result
Expand All @@ -89,7 +105,7 @@ class query_result
std::shared_ptr<local_result_wrapper> result_wrapper;

public:
query_result(local_result * result) : result_wrapper(std::make_shared<local_result_wrapper>(result)) { }
query_result(local_result_v2 * result) : result_wrapper(std::make_shared<local_result_wrapper>(result)) { }
~query_result() { }
char * data() { return result_wrapper->data(); }
py::bytes bytes() { return result_wrapper->bytes(); }
Expand All @@ -98,6 +114,8 @@ class query_result
size_t rows_read() { return result_wrapper->rows_read(); }
size_t bytes_read() { return result_wrapper->bytes_read(); }
double elapsed() { return result_wrapper->elapsed(); }
bool has_error() { return result_wrapper->has_error(); }
py::str error_message() { return result_wrapper->error_message(); }
memoryview_wrapper * get_memview();
};

Expand Down
118 changes: 84 additions & 34 deletions programs/local/LocalServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <Interpreters/ProcessList.h>
#include <Interpreters/loadMetadata.h>
#include <base/getFQDNOrHostName.h>
#include <Common/scope_guard_safe.h>
#include <Interpreters/Session.h>
#include <Access/AccessControl.h>
#include <Common/Exception.h>
Expand All @@ -34,15 +33,12 @@
#include <Common/quoteString.h>
#include <Common/randomSeed.h>
#include <Common/ThreadPool.h>
#include <Loggers/Loggers.h>
#include <Loggers/OwnFormattingChannel.h>
#include <Loggers/OwnPatternFormatter.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/UseSSL.h>
#include <IO/SharedThreadPools.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTInsertQuery.h>
#include <Common/ErrorHandlers.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
Expand All @@ -59,6 +55,7 @@
#include <base/argsToConfig.h>
#include <filesystem>
#include <fstream>
#include <memory>

#include "config.h"

Expand Down Expand Up @@ -563,14 +560,14 @@ catch (const DB::Exception & e)
cleanup();

bool need_print_stack_trace = config().getBool("stacktrace", false);
std::cerr << getExceptionMessage(e, need_print_stack_trace, true) << std::endl;
error_message_oss << getExceptionMessage(e, need_print_stack_trace, true);
return e.code() ? e.code() : -1;
}
catch (...)
{
cleanup();

std::cerr << getCurrentExceptionMessage(false) << std::endl;
error_message_oss << getCurrentExceptionMessage(false);
return getCurrentExceptionCode();
}

Expand Down Expand Up @@ -1024,12 +1021,26 @@ void LocalServer::readArguments(int argc, char ** argv, Arguments & common_argum
class query_result_
{
public:
uint64_t rows;
uint64_t bytes;
double elapsed;
std::vector<char> * buf;
explicit query_result_(std::vector<char>* buf, uint64_t rows,
uint64_t bytes, double elapsed):
rows_(rows), bytes_(bytes), elapsed_(elapsed),
buf_(buf) { }

explicit query_result_(std::string&& error_msg): error_msg_(error_msg) { }

std::string string()
{
return std::string(buf_->begin(), buf_->end());
}

uint64_t rows_;
uint64_t bytes_;
double elapsed_;
std::vector<char> * buf_;
std::string error_msg_;
};


std::unique_ptr<query_result_> pyEntryClickHouseLocal(int argc, char ** argv)
{
try
Expand All @@ -1039,18 +1050,13 @@ std::unique_ptr<query_result_> pyEntryClickHouseLocal(int argc, char ** argv)
int ret = app.run();
if (ret == 0)
{
auto result = std::make_unique<query_result_>();
result->buf = app.getQueryOutputVector();
result->rows = app.getProcessedRows();
result->bytes = app.getProcessedBytes();
result->elapsed = app.getElapsedTime();

// std::cerr << std::string(out->begin(), out->end()) << std::endl;
return result;
}
else
{
return nullptr;
return std::make_unique<query_result_>(
app.getQueryOutputVector(),
app.getProcessedRows(),
app.getProcessedBytes(),
app.getElapsedTime());
} else {
return std::make_unique<query_result_>(app.get_error_msg());
}
}
catch (const DB::Exception & e)
Expand All @@ -1072,29 +1078,73 @@ std::unique_ptr<query_result_> pyEntryClickHouseLocal(int argc, char ** argv)
local_result * query_stable(int argc, char ** argv)
{
auto result = pyEntryClickHouseLocal(argc, argv);
if (!result || !result->buf)
if (result->error_msg_.empty())
{
return nullptr;
}
local_result * res = new local_result;
res->len = result->buf->size();
res->buf = result->buf->data();
res->_vec = result->buf;
res->rows_read = result->rows;
res->bytes_read = result->bytes;
res->elapsed = result->elapsed;
res->len = result->buf_->size();
res->buf = result->buf_->data();
res->_vec = result->buf_;
res->rows_read = result->rows_;
res->bytes_read = result->bytes_;
res->elapsed = result->elapsed_;
return res;
}

void free_result(local_result * result)
{
if (!result || !result->_vec)
if (!result)
{
return;
}
if (result->_vec)
{
std::vector<char> * vec = reinterpret_cast<std::vector<char> *>(result->_vec);
delete vec;
result->_vec = nullptr;
}
}

local_result_v2 * query_stable_v2(int argc, char ** argv)
{
auto result = pyEntryClickHouseLocal(argc, argv);
local_result_v2 * res = new local_result_v2;
if (!result->error_msg_.empty())
{
res->error_message = new char[result->error_msg_.size() + 1];
memcpy(res->error_message, result->error_msg_.c_str(), result->error_msg_.size() + 1);
res->_vec = nullptr;
res->buf = nullptr;
} else {
res->len = result->buf_->size();
res->buf = result->buf_->data();
res->_vec = result->buf_;
res->rows_read = result->rows_;
res->bytes_read = result->bytes_;
res->elapsed = result->elapsed_;
res->error_message = nullptr;
}
return res;
}

void free_result_v2(local_result_v2 * result)
{
if (!result)
{
return;
}
std::vector<char> * vec = reinterpret_cast<std::vector<char> *>(result->_vec);
delete vec;
result->_vec = nullptr;
if (result->_vec)
{
std::vector<char> * vec = reinterpret_cast<std::vector<char> *>(result->_vec);
delete vec;
result->_vec = nullptr;
}
if (result->error_message)
{
delete[] result->error_message;
result->error_message = nullptr;
}
}

/**
Expand Down Expand Up @@ -1125,7 +1175,7 @@ int mainEntryClickHouseLocal(int argc, char ** argv)
auto result = pyEntryClickHouseLocal(argc, argv);
if (result)
{
std::cout << std::string(result->buf->begin(), result->buf->end()) << std::endl;
std::cout << result->string() << std::endl;
return 0;
}
else
Expand Down
14 changes: 14 additions & 0 deletions programs/local/chdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,23 @@ struct CHDB_EXPORT local_result
uint64_t bytes_read;
};

struct CHDB_EXPORT local_result_v2
{
char * buf;
size_t len;
void * _vec; // std::vector<char> *, for freeing
double elapsed;
uint64_t rows_read;
uint64_t bytes_read;
char * error_message;
};

CHDB_EXPORT struct local_result * query_stable(int argc, char ** argv);
CHDB_EXPORT void free_result(struct local_result * result);

CHDB_EXPORT struct local_result_v2 * query_stable_v2(int argc, char ** argv);
CHDB_EXPORT void free_result_v2(struct local_result_v2 * result);

#ifdef __cplusplus
}
#endif
2 changes: 2 additions & 0 deletions src/Client/ClientBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class ClientBase : public Poco::Util::Application, public IHints<2>
size_t getProcessedRows() const { return processed_rows; }
size_t getProcessedBytes() const { return processed_bytes; }
double getElapsedTime() const { return progress_indication.elapsedSeconds(); }
std::string get_error_msg() const { return error_message_oss.str(); }

std::vector<String> getAllRegisteredNames() const override { return cmd_options; }

Expand Down Expand Up @@ -292,6 +293,7 @@ class ClientBase : public Poco::Util::Application, public IHints<2>
size_t processed_rows = 0; /// How many rows have been read or written.
size_t processed_bytes = 0; /// How many bytes have been read or written.
bool print_num_processed_rows = false; /// Whether to print the number of processed rows at
std::stringstream error_message_oss; /// error message stringstream

bool print_stack_trace = false;
/// The last exception that was received from the server. Is used for the
Expand Down
4 changes: 4 additions & 0 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ class TestBasic(unittest.TestCase):
def test_basic(self):
res = chdb.query("SELECT 1", "CSV")
self.assertEqual(len(res), 2) # "1\n"
self.assertFalse(res.has_error())
self.assertTrue(len(res.error_message()) == 0)
with self.assertRaises(Exception):
res = chdb.query("SELECT 1", "csv")
class TestOutput(unittest.TestCase):
def test_output(self):
for format, output in format_output.items():
Expand Down