Skip to content

Commit 0d9a1de

Browse files
authored
Merge pull request #796 from Altinity/feature/antalya-25.3/fix_remote_calls
25.3 Antalya port #583, #584, #703, #720 - fixes for distributed calls
2 parents 7de1214 + fec06ec commit 0d9a1de

File tree

17 files changed

+238
-52
lines changed

17 files changed

+238
-52
lines changed

src/Interpreters/ClusterProxy/executeQuery.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,7 @@ void executeQuery(
443443
not_optimized_cluster->getName());
444444

445445
read_from_remote->setStepDescription("Read from remote replica");
446+
read_from_remote->setIsRemoteFunction(is_remote_function);
446447
plan->addStep(std::move(read_from_remote));
447448
plan->addInterpreterContext(new_context);
448449
plans.emplace_back(std::move(plan));

src/Interpreters/Context.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2844,8 +2844,11 @@ void Context::setCurrentQueryId(const String & query_id)
28442844

28452845
client_info.current_query_id = query_id_to_set;
28462846

2847-
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
2847+
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY
2848+
&& (getApplicationType() != ApplicationType::SERVER || client_info.initial_query_id.empty()))
2849+
{
28482850
client_info.initial_query_id = client_info.current_query_id;
2851+
}
28492852
}
28502853

28512854
void Context::setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType background_operation)

src/Interpreters/InterpreterSelectQuery.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
#include <Processors/QueryPlan/TotalsHavingStep.h>
6969
#include <Processors/QueryPlan/WindowStep.h>
7070
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
71+
#include <Processors/QueryPlan/ObjectFilterStep.h>
7172
#include <Processors/Sources/NullSource.h>
7273
#include <Processors/Sources/SourceFromSingleChunk.h>
7374
#include <Processors/Transforms/AggregatingTransform.h>
@@ -189,6 +190,7 @@ namespace Setting
189190
extern const SettingsUInt64 min_count_to_compile_aggregate_expression;
190191
extern const SettingsBool enable_software_prefetch_in_aggregation;
191192
extern const SettingsBool optimize_group_by_constant_keys;
193+
extern const SettingsBool use_hive_partitioning;
192194
}
193195

194196
namespace ServerSetting
@@ -1965,6 +1967,22 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
19651967

19661968
if (expressions.second_stage || from_aggregation_stage)
19671969
{
1970+
if (settings[Setting::use_hive_partitioning]
1971+
&& !expressions.first_stage
1972+
&& expressions.hasWhere())
1973+
{
1974+
if (typeid_cast<ReadFromCluster *>(query_plan.getRootNode()->step.get()))
1975+
{
1976+
auto object_filter_step = std::make_unique<ObjectFilterStep>(
1977+
query_plan.getCurrentHeader(),
1978+
expressions.before_where->dag.clone(),
1979+
getSelectQuery().where()->getColumnName());
1980+
1981+
object_filter_step->setStepDescription("WHERE");
1982+
query_plan.addStep(std::move(object_filter_step));
1983+
}
1984+
}
1985+
19681986
if (from_aggregation_stage)
19691987
{
19701988
/// No need to aggregate anything, since this was done on remote shards.

src/Planner/Planner.cpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include <Processors/QueryPlan/WindowStep.h>
3838
#include <Processors/QueryPlan/ReadNothingStep.h>
3939
#include <Processors/QueryPlan/ReadFromRecursiveCTEStep.h>
40+
#include <Processors/QueryPlan/ObjectFilterStep.h>
4041
#include <QueryPipeline/QueryPipelineBuilder.h>
4142

4243
#include <Interpreters/Context.h>
@@ -133,6 +134,7 @@ namespace Setting
133134
extern const SettingsUInt64 min_count_to_compile_aggregate_expression;
134135
extern const SettingsBool enable_software_prefetch_in_aggregation;
135136
extern const SettingsBool optimize_group_by_constant_keys;
137+
extern const SettingsBool use_hive_partitioning;
136138
}
137139

138140
namespace ServerSetting
@@ -413,6 +415,19 @@ void addFilterStep(QueryPlan & query_plan,
413415
query_plan.addStep(std::move(where_step));
414416
}
415417

418+
void addObjectFilterStep(QueryPlan & query_plan,
419+
FilterAnalysisResult & filter_analysis_result,
420+
const std::string & step_description)
421+
{
422+
auto actions = std::move(filter_analysis_result.filter_actions->dag);
423+
424+
auto where_step = std::make_unique<ObjectFilterStep>(query_plan.getCurrentHeader(),
425+
std::move(actions),
426+
filter_analysis_result.filter_column_name);
427+
where_step->setStepDescription(step_description);
428+
query_plan.addStep(std::move(where_step));
429+
}
430+
416431
Aggregator::Params getAggregatorParams(const PlannerContextPtr & planner_context,
417432
const AggregationAnalysisResult & aggregation_analysis_result,
418433
const QueryAnalysisResult & query_analysis_result,
@@ -1677,6 +1692,16 @@ void Planner::buildPlanForQueryNode()
16771692

16781693
if (query_processing_info.isSecondStage() || query_processing_info.isFromAggregationState())
16791694
{
1695+
if (settings[Setting::use_hive_partitioning]
1696+
&& !query_processing_info.isFirstStage()
1697+
&& expression_analysis_result.hasWhere())
1698+
{
1699+
if (typeid_cast<ReadFromCluster *>(query_plan.getRootNode()->step.get()))
1700+
{
1701+
addObjectFilterStep(query_plan, expression_analysis_result.getWhere(), "WHERE");
1702+
}
1703+
}
1704+
16801705
if (query_processing_info.isFromAggregationState())
16811706
{
16821707
/// Aggregation was performed on remote shards
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#include <Processors/QueryPlan/ObjectFilterStep.h>
2+
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
3+
#include <Processors/QueryPlan/Serialization.h>
4+
#include <Processors/Transforms/FilterTransform.h>
5+
#include <IO/Operators.h>
6+
7+
#include <memory>
8+
9+
namespace DB
10+
{
11+
12+
namespace ErrorCodes
13+
{
14+
extern const int INCORRECT_DATA;
15+
}
16+
17+
ObjectFilterStep::ObjectFilterStep(
18+
const Header & input_header_,
19+
ActionsDAG actions_dag_,
20+
String filter_column_name_)
21+
: actions_dag(std::move(actions_dag_))
22+
, filter_column_name(std::move(filter_column_name_))
23+
{
24+
input_headers.emplace_back(input_header_);
25+
output_header = input_headers.front();
26+
}
27+
28+
QueryPipelineBuilderPtr ObjectFilterStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & /* settings */)
29+
{
30+
return std::move(pipelines.front());
31+
}
32+
33+
void ObjectFilterStep::updateOutputHeader()
34+
{
35+
output_header = input_headers.front();
36+
}
37+
38+
void ObjectFilterStep::serialize(Serialization & ctx) const
39+
{
40+
writeStringBinary(filter_column_name, ctx.out);
41+
42+
actions_dag.serialize(ctx.out, ctx.registry);
43+
}
44+
45+
std::unique_ptr<IQueryPlanStep> ObjectFilterStep::deserialize(Deserialization & ctx)
46+
{
47+
if (ctx.input_headers.size() != 1)
48+
throw Exception(ErrorCodes::INCORRECT_DATA, "ObjectFilterStep must have one input stream");
49+
50+
String filter_column_name;
51+
readStringBinary(filter_column_name, ctx.in);
52+
53+
ActionsDAG actions_dag = ActionsDAG::deserialize(ctx.in, ctx.registry, ctx.context);
54+
55+
return std::make_unique<ObjectFilterStep>(ctx.input_headers.front(), std::move(actions_dag), std::move(filter_column_name));
56+
}
57+
58+
void registerObjectFilterStep(QueryPlanStepRegistry & registry)
59+
{
60+
registry.registerStep("ObjectFilter", ObjectFilterStep::deserialize);
61+
}
62+
63+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#pragma once
2+
#include <Processors/QueryPlan/IQueryPlanStep.h>
3+
#include <Interpreters/ActionsDAG.h>
4+
5+
namespace DB
6+
{
7+
8+
/// Implements WHERE operation.
9+
class ObjectFilterStep : public IQueryPlanStep
10+
{
11+
public:
12+
ObjectFilterStep(
13+
const Header & input_header_,
14+
ActionsDAG actions_dag_,
15+
String filter_column_name_);
16+
17+
String getName() const override { return "ObjectFilter"; }
18+
QueryPipelineBuilderPtr updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & settings) override;
19+
20+
const ActionsDAG & getExpression() const { return actions_dag; }
21+
ActionsDAG & getExpression() { return actions_dag; }
22+
const String & getFilterColumnName() const { return filter_column_name; }
23+
24+
void serialize(Serialization & ctx) const override;
25+
26+
static std::unique_ptr<IQueryPlanStep> deserialize(Deserialization & ctx);
27+
28+
private:
29+
void updateOutputHeader() override;
30+
31+
ActionsDAG actions_dag;
32+
String filter_column_name;
33+
};
34+
35+
}

src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyConditionAndLimit.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <Processors/QueryPlan/FilterStep.h>
44
#include <Processors/QueryPlan/LimitStep.h>
55
#include <Processors/QueryPlan/SourceStepWithFilter.h>
6+
#include <Processors/QueryPlan/ObjectFilterStep.h>
67

78
namespace DB::QueryPlanOptimizations
89
{
@@ -41,6 +42,10 @@ void optimizePrimaryKeyConditionAndLimit(const Stack & stack)
4142
/// So this is likely not needed.
4243
continue;
4344
}
45+
else if (auto * object_filter_step = typeid_cast<ObjectFilterStep *>(iter->node->step.get()))
46+
{
47+
source_step_with_filter->addFilter(object_filter_step->getExpression().clone(), object_filter_step->getFilterColumnName());
48+
}
4449
else
4550
{
4651
break;

src/Processors/QueryPlan/QueryPlanStepRegistry.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ void registerFilterStep(QueryPlanStepRegistry & registry);
4949
void registerTotalsHavingStep(QueryPlanStepRegistry & registry);
5050
void registerExtremesStep(QueryPlanStepRegistry & registry);
5151
void registerJoinStep(QueryPlanStepRegistry & registry);
52+
void registerObjectFilterStep(QueryPlanStepRegistry & registry);
5253

5354
void QueryPlanStepRegistry::registerPlanSteps()
5455
{
@@ -67,6 +68,7 @@ void QueryPlanStepRegistry::registerPlanSteps()
6768
registerTotalsHavingStep(registry);
6869
registerExtremesStep(registry);
6970
registerJoinStep(registry);
71+
registerObjectFilterStep(registry);
7072
}
7173

7274
}

src/Processors/QueryPlan/ReadFromRemote.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,7 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream
459459
my_stage = stage, my_storage = storage,
460460
add_agg_info, add_totals, add_extremes, async_read, async_query_sending,
461461
query_tree = shard.query_tree, planner_context = shard.planner_context,
462-
pushed_down_filters]() mutable
462+
pushed_down_filters, my_is_remote_function = is_remote_function]() mutable
463463
-> QueryPipelineBuilder
464464
{
465465
auto current_settings = my_context->getSettingsRef();
@@ -543,6 +543,8 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream
543543
{DataTypeUInt32().createColumnConst(1, my_shard.shard_info.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
544544
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
545545
std::move(connections), query_string, header, my_context, my_throttler, my_scalars, my_external_tables, my_stage);
546+
remote_query_executor->setRemoteFunction(my_is_remote_function);
547+
remote_query_executor->setShardCount(my_shard_count);
546548

547549
auto pipe = createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending);
548550
QueryPipelineBuilder builder;
@@ -627,6 +629,8 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
627629
priority_func);
628630
remote_query_executor->setLogger(log);
629631
remote_query_executor->setPoolMode(PoolMode::GET_ONE);
632+
remote_query_executor->setRemoteFunction(is_remote_function);
633+
remote_query_executor->setShardCount(shard_count);
630634

631635
if (!table_func_ptr)
632636
remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table);
@@ -646,6 +650,8 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
646650
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
647651
shard.shard_info.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage);
648652
remote_query_executor->setLogger(log);
653+
remote_query_executor->setRemoteFunction(is_remote_function);
654+
remote_query_executor->setShardCount(shard_count);
649655

650656
if (context->canUseTaskBasedParallelReplicas())
651657
{

src/Processors/QueryPlan/ReadFromRemote.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class ReadFromRemote final : public SourceStepWithFilterBase
4646

4747
void enableMemoryBoundMerging();
4848
void enforceAggregationInOrder();
49+
void setIsRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; }
4950

5051
private:
5152
ClusterProxy::SelectStreamFactory::Shards shards;
@@ -61,6 +62,7 @@ class ReadFromRemote final : public SourceStepWithFilterBase
6162
UInt32 shard_count;
6263
const String cluster_name;
6364
std::optional<GetPriorityForLoadBalancing> priority_func_factory;
65+
bool is_remote_function = false;
6466

6567
Pipes addPipes(const ClusterProxy::SelectStreamFactory::Shards & used_shards, const Header & out_header);
6668
void addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard, const Header & out_header);

0 commit comments

Comments
 (0)