Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/Processors/Sources/PythonSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ extern const int PY_EXCEPTION_OCCURED;

PythonSource::PythonSource(
py::object & data_source_,
bool isInheritsFromPyReader_,
const Block & sample_block_,
PyColumnVecPtr column_cache,
size_t data_source_row_count,
Expand All @@ -56,6 +57,7 @@ PythonSource::PythonSource(
size_t num_streams)
: ISource(sample_block_.cloneEmpty())
, data_source(data_source_)
, isInheritsFromPyReader(isInheritsFromPyReader_)
, sample_block(sample_block_)
, column_cache(column_cache)
, data_source_row_count(data_source_row_count)
Expand Down Expand Up @@ -544,7 +546,7 @@ Chunk PythonSource::generate()

try
{
if (isInheritsFromPyReader(data_source))
if (isInheritsFromPyReader)
{
PyObjectVecPtr data;
py::gil_scoped_acquire acquire;
Expand Down
2 changes: 2 additions & 0 deletions src/Processors/Sources/PythonSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class PythonSource : public ISource
public:
PythonSource(
py::object & data_source_,
bool isInheritsFromPyReader_,
const Block & sample_block_,
PyColumnVecPtr column_cache,
size_t data_source_row_count,
Expand All @@ -42,6 +43,7 @@ class PythonSource : public ISource

private:
py::object & data_source; // Do not own the reference
bool isInheritsFromPyReader; // If the data_source is a PyReader object

Block sample_block;
PyColumnVecPtr column_cache;
Expand Down
5 changes: 3 additions & 2 deletions src/Storages/StoragePython.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ Pipe StoragePython::read(

if (isInheritsFromPyReader(data_source))
{
return Pipe(std::make_shared<PythonSource>(data_source, sample_block, column_cache, data_source_row_count, max_block_size, 0, 1));
return Pipe(
std::make_shared<PythonSource>(data_source, true, sample_block, column_cache, data_source_row_count, max_block_size, 0, 1));
}

prepareColumnCache(column_names, sample_block.getColumns(), sample_block);
Expand All @@ -79,7 +80,7 @@ Pipe StoragePython::read(
// num_streams = 32; // for chdb testing
for (size_t stream = 0; stream < num_streams; ++stream)
pipes.emplace_back(std::make_shared<PythonSource>(
data_source, sample_block, column_cache, data_source_row_count, max_block_size, stream, num_streams));
data_source, false, sample_block, column_cache, data_source_row_count, max_block_size, stream, num_streams));
return Pipe::unitePipes(std::move(pipes));
}

Expand Down
43 changes: 43 additions & 0 deletions tests/queries.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
SELECT COUNT(*) FROM Python(hits);
SELECT COUNT(*) FROM Python(hits) WHERE AdvEngineID <> 0;
SELECT SUM(AdvEngineID), COUNT(*), AVG(ResolutionWidth) FROM Python(hits);
SELECT AVG(UserID) FROM Python(hits);
SELECT COUNT(DISTINCT UserID) FROM Python(hits);
SELECT COUNT(DISTINCT SearchPhrase) FROM Python(hits);
SELECT MIN(EventDate), MAX(EventDate) FROM Python(hits);
SELECT AdvEngineID, COUNT(*) FROM Python(hits) WHERE AdvEngineID <> 0 GROUP BY AdvEngineID ORDER BY COUNT(*) DESC;
SELECT RegionID, COUNT(DISTINCT UserID) AS u FROM Python(hits) GROUP BY RegionID ORDER BY u DESC LIMIT 10;
SELECT RegionID, SUM(AdvEngineID), COUNT(*) AS c, AVG(ResolutionWidth), COUNT(DISTINCT UserID) FROM Python(hits) GROUP BY RegionID ORDER BY c DESC LIMIT 10;
SELECT MobilePhoneModel, COUNT(DISTINCT UserID) AS u FROM Python(hits) WHERE MobilePhoneModel <> '' GROUP BY MobilePhoneModel ORDER BY u DESC LIMIT 10;
SELECT MobilePhone, MobilePhoneModel, COUNT(DISTINCT UserID) AS u FROM Python(hits) WHERE MobilePhoneModel <> '' GROUP BY MobilePhone, MobilePhoneModel ORDER BY u DESC LIMIT 10;
SELECT SearchPhrase, COUNT(*) AS c FROM Python(hits) WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10;
SELECT SearchPhrase, COUNT(DISTINCT UserID) AS u FROM Python(hits) WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY u DESC LIMIT 10;
SELECT SearchEngineID, SearchPhrase, COUNT(*) AS c FROM Python(hits) WHERE SearchPhrase <> '' GROUP BY SearchEngineID, SearchPhrase ORDER BY c DESC LIMIT 10;
SELECT UserID, COUNT(*) FROM Python(hits) GROUP BY UserID ORDER BY COUNT(*) DESC LIMIT 10;
SELECT UserID, SearchPhrase, COUNT(*) FROM Python(hits) GROUP BY UserID, SearchPhrase ORDER BY COUNT(*) DESC LIMIT 10;
SELECT UserID, SearchPhrase, COUNT(*) FROM Python(hits) GROUP BY UserID, SearchPhrase LIMIT 10;
SELECT UserID, extract(minute FROM EventTime) AS m, SearchPhrase, COUNT(*) FROM Python(hits) GROUP BY UserID, m, SearchPhrase ORDER BY COUNT(*) DESC LIMIT 10;
SELECT UserID FROM Python(hits) WHERE UserID = 435090932899640449;
SELECT COUNT(*) FROM Python(hits) WHERE URL LIKE '%google%';
SELECT SearchPhrase, MIN(URL), COUNT(*) AS c FROM Python(hits) WHERE URL LIKE '%google%' AND SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10;
SELECT SearchPhrase, MIN(URL), MIN(Title), COUNT(*) AS c, COUNT(DISTINCT UserID) FROM Python(hits) WHERE Title LIKE '%Google%' AND URL NOT LIKE '%.google.%' AND SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10;
SELECT * FROM Python(hits) WHERE URL LIKE '%google%' ORDER BY EventTime LIMIT 10;
SELECT SearchPhrase FROM Python(hits) WHERE SearchPhrase <> '' ORDER BY EventTime LIMIT 10;
SELECT SearchPhrase FROM Python(hits) WHERE SearchPhrase <> '' ORDER BY SearchPhrase LIMIT 10;
SELECT SearchPhrase FROM Python(hits) WHERE SearchPhrase <> '' ORDER BY EventTime, SearchPhrase LIMIT 10;
SELECT CounterID, AVG(length(URL)) AS l, COUNT(*) AS c FROM Python(hits) WHERE URL <> '' GROUP BY CounterID HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;
SELECT REGEXP_REPLACE(Referer, '^https?://(?:www\.)?([^/]+)/.*$', '\1') AS k, AVG(length(Referer)) AS l, COUNT(*) AS c, MIN(Referer) FROM Python(hits) WHERE Referer <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;
SELECT SUM(ResolutionWidth), SUM(ResolutionWidth + 1), SUM(ResolutionWidth + 2), SUM(ResolutionWidth + 3), SUM(ResolutionWidth + 4), SUM(ResolutionWidth + 5), SUM(ResolutionWidth + 6), SUM(ResolutionWidth + 7), SUM(ResolutionWidth + 8), SUM(ResolutionWidth + 9), SUM(ResolutionWidth + 10), SUM(ResolutionWidth + 11), SUM(ResolutionWidth + 12), SUM(ResolutionWidth + 13), SUM(ResolutionWidth + 14), SUM(ResolutionWidth + 15), SUM(ResolutionWidth + 16), SUM(ResolutionWidth + 17), SUM(ResolutionWidth + 18), SUM(ResolutionWidth + 19), SUM(ResolutionWidth + 20), SUM(ResolutionWidth + 21), SUM(ResolutionWidth + 22), SUM(ResolutionWidth + 23), SUM(ResolutionWidth + 24), SUM(ResolutionWidth + 25), SUM(ResolutionWidth + 26), SUM(ResolutionWidth + 27), SUM(ResolutionWidth + 28), SUM(ResolutionWidth + 29), SUM(ResolutionWidth + 30), SUM(ResolutionWidth + 31), SUM(ResolutionWidth + 32), SUM(ResolutionWidth + 33), SUM(ResolutionWidth + 34), SUM(ResolutionWidth + 35), SUM(ResolutionWidth + 36), SUM(ResolutionWidth + 37), SUM(ResolutionWidth + 38), SUM(ResolutionWidth + 39), SUM(ResolutionWidth + 40), SUM(ResolutionWidth + 41), SUM(ResolutionWidth + 42), SUM(ResolutionWidth + 43), SUM(ResolutionWidth + 44), SUM(ResolutionWidth + 45), SUM(ResolutionWidth + 46), SUM(ResolutionWidth + 47), SUM(ResolutionWidth + 48), SUM(ResolutionWidth + 49), SUM(ResolutionWidth + 50), SUM(ResolutionWidth + 51), SUM(ResolutionWidth + 52), SUM(ResolutionWidth + 53), SUM(ResolutionWidth + 54), SUM(ResolutionWidth + 55), SUM(ResolutionWidth + 56), SUM(ResolutionWidth + 57), SUM(ResolutionWidth + 58), SUM(ResolutionWidth + 59), SUM(ResolutionWidth + 60), SUM(ResolutionWidth + 61), SUM(ResolutionWidth + 62), SUM(ResolutionWidth + 63), SUM(ResolutionWidth + 64), SUM(ResolutionWidth + 65), SUM(ResolutionWidth + 66), SUM(ResolutionWidth + 67), SUM(ResolutionWidth + 68), SUM(ResolutionWidth + 69), SUM(ResolutionWidth + 70), SUM(ResolutionWidth + 71), SUM(ResolutionWidth + 72), SUM(ResolutionWidth + 73), SUM(ResolutionWidth + 74), SUM(ResolutionWidth + 75), SUM(ResolutionWidth + 76), SUM(ResolutionWidth + 77), SUM(ResolutionWidth + 78), SUM(ResolutionWidth + 79), SUM(ResolutionWidth + 80), SUM(ResolutionWidth + 81), SUM(ResolutionWidth + 82), SUM(ResolutionWidth + 83), SUM(ResolutionWidth + 84), SUM(ResolutionWidth + 85), SUM(ResolutionWidth + 86), SUM(ResolutionWidth + 87), SUM(ResolutionWidth + 88), SUM(ResolutionWidth + 89) FROM Python(hits);
SELECT SearchEngineID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM Python(hits) WHERE SearchPhrase <> '' GROUP BY SearchEngineID, ClientIP ORDER BY c DESC LIMIT 10;
SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM Python(hits) WHERE SearchPhrase <> '' GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10;
SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM Python(hits) GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10;
SELECT URL, COUNT(*) AS c FROM Python(hits) GROUP BY URL ORDER BY c DESC LIMIT 10;
SELECT 1, URL, COUNT(*) AS c FROM Python(hits) GROUP BY 1, URL ORDER BY c DESC LIMIT 10;
SELECT ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3, COUNT(*) AS c FROM Python(hits) GROUP BY ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3 ORDER BY c DESC LIMIT 10;
SELECT URL, COUNT(*) AS PageViews FROM Python(hits) WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND DontCountHits = 0 AND IsRefresh = 0 AND URL <> '' GROUP BY URL ORDER BY PageViews DESC LIMIT 10;
SELECT Title, COUNT(*) AS PageViews FROM Python(hits) WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND DontCountHits = 0 AND IsRefresh = 0 AND Title <> '' GROUP BY Title ORDER BY PageViews DESC LIMIT 10;
SELECT URL, COUNT(*) AS PageViews FROM Python(hits) WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND IsLink <> 0 AND IsDownload = 0 GROUP BY URL ORDER BY PageViews DESC LIMIT 10 OFFSET 1000;
SELECT TraficSourceID, SearchEngineID, AdvEngineID, CASE WHEN (SearchEngineID = 0 AND AdvEngineID = 0) THEN Referer ELSE '' END AS Src, URL AS Dst, COUNT(*) AS PageViews FROM Python(hits) WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 GROUP BY TraficSourceID, SearchEngineID, AdvEngineID, Src, Dst ORDER BY PageViews DESC LIMIT 10 OFFSET 1000;
SELECT URLHash, EventDate, COUNT(*) AS PageViews FROM Python(hits) WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND TraficSourceID IN (-1, 6) AND RefererHash = 3594120000172545465 GROUP BY URLHash, EventDate ORDER BY PageViews DESC LIMIT 10 OFFSET 100;
SELECT WindowClientWidth, WindowClientHeight, COUNT(*) AS PageViews FROM Python(hits) WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND DontCountHits = 0 AND URLHash = 2868770270353813622 GROUP BY WindowClientWidth, WindowClientHeight ORDER BY PageViews DESC LIMIT 10 OFFSET 10000;
SELECT DATE_TRUNC('minute', EventTime) AS M, COUNT(*) AS PageViews FROM Python(hits) WHERE CounterID = 62 AND EventDate >= '2013-07-14' AND EventDate <= '2013-07-15' AND IsRefresh = 0 AND DontCountHits = 0 GROUP BY DATE_TRUNC('minute', EventTime) ORDER BY DATE_TRUNC('minute', EventTime) LIMIT 10 OFFSET 1000
96 changes: 96 additions & 0 deletions tests/test_state2_dataframe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#!/usr/bin/env python3

import unittest
import timeit
import datetime
import json
import tempfile
import pandas as pd
import chdb
import os
from urllib.request import urlretrieve


class TestChDBDataFrame(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Download parquet file if it doesn't exist
parquet_file = "hits_0.parquet"
if not os.path.exists(parquet_file):
print(f"Downloading {parquet_file}...")
url = "https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_0.parquet"
urlretrieve(url, parquet_file)
print("Download complete!")

# Load data and prepare DataFrame
cls.hits = pd.read_parquet(parquet_file)
cls.dataframe_size = cls.hits.memory_usage().sum()

# Fix types
cls.hits["EventTime"] = pd.to_datetime(cls.hits["EventTime"], unit="s")
cls.hits["EventDate"] = pd.to_datetime(cls.hits["EventDate"], unit="D")

# Convert object columns to string
for col in cls.hits.columns:
if cls.hits[col].dtype == "O":
cls.hits[col] = cls.hits[col].astype(str)

# Load queries
with open("queries.sql") as f:
cls.queries = f.readlines()

def setUp(self):
self.tmp_dir = tempfile.TemporaryDirectory()
self.conn = chdb.connect(f"{self.tmp_dir.name}")

def tearDown(self):
self.conn.close()
self.tmp_dir.cleanup()

def test_dataframe_size(self):
self.assertGreater(self.dataframe_size, 0, "DataFrame size should be positive")

def test_query_execution(self):
queries_times = []
for i, query in enumerate(self.queries, 1):
times = []
for _ in range(3):
start = timeit.default_timer()
result = self.conn.query(query, "CSV")
end = timeit.default_timer()
times.append(end - start)

# Verify query results are not empty
self.assertIsNotNone(result, f"Query {i} returned None")

queries_times.append(times)
# Verify execution times are reasonable
self.assertTrue(
all(t > 0 for t in times), f"Query {i} has invalid execution times"
)

result_json = {
"system": "chDB 2.2(DataFrame)",
"date": datetime.date.today().strftime("%Y-%m-%d"),
"machine": "",
"cluster_size": 1,
"comment": "",
"tags": [
"C++",
"column-oriented",
"embedded",
"stateless",
"serverless",
"dataframe",
"ClickHouse derivative",
],
"load_time": 0,
"data_size": int(self.dataframe_size),
"result": queries_times, # Will be populated during test_query_execution
}

print(json.dumps(result_json, indent=2))


if __name__ == "__main__":
unittest.main()