Skip to content

Commit 586d2fa

Browse files
authored
Fix in hostname in TKafkaFindCoordinatorActor (#29893) (#29914)
2 parents 7bec12d + a88e63d commit 586d2fa

File tree

4 files changed

+12
-27
lines changed

4 files changed

+12
-27
lines changed

ydb/core/driver_lib/run/kikimr_services_initializers.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1832,11 +1832,11 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se
18321832
}
18331833

18341834
if (Config.GetKafkaProxyConfig().GetEnableKafkaProxy()) {
1835-
const auto& kakfaConfig = Config.GetKafkaProxyConfig();
1835+
const auto& kafkaConfig = Config.GetKafkaProxyConfig();
18361836
TIntrusivePtr<NGRpcService::TGrpcEndpointDescription> desc = new NGRpcService::TGrpcEndpointDescription();
18371837
desc->Address = config.GetPublicHost() ? config.GetPublicHost() : address;
1838-
desc->Port = kakfaConfig.GetListeningPort();
1839-
desc->Ssl = kakfaConfig.HasSslCertificate();
1838+
desc->Port = kafkaConfig.GetListeningPort();
1839+
desc->Ssl = kafkaConfig.HasSslCertificate();
18401840

18411841
desc->EndpointId = NGRpcService::KafkaEndpointId;
18421842
endpoints.push_back(std::move(desc));
@@ -3037,6 +3037,11 @@ void TKafkaProxyServiceInitializer::InitializeServices(NActors::TActorSystemSetu
30373037
TMailboxType::HTSwap, appData->UserPoolId
30383038
)
30393039
);
3040+
const auto &grpcConfig = Config.GetGRpcConfig();
3041+
const TString &address = grpcConfig.GetHost() && grpcConfig.GetHost() != "[::]" ? grpcConfig.GetHost() : FQDNHostName();
3042+
auto& kafkaMutableConfig = *Config.MutableKafkaProxyConfig();
3043+
kafkaMutableConfig.SetPublicHost(grpcConfig.GetPublicHost() ? grpcConfig.GetPublicHost() : address);
3044+
30403045
setup->LocalServices.emplace_back(
30413046
TActorId(),
30423047
TActorSetupCmd(NKafka::CreateKafkaListener(MakePollerActorId(), settings, Config.GetKafkaProxyConfig()),

ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,7 @@ void TKafkaFindCoordinatorActor::Bootstrap(const NActors::TActorContext& ctx) {
2727
return;
2828
}
2929

30-
Send(NKikimr::NIcNodeCache::CreateICNodesInfoCacheServiceId(), new NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoRequest());
31-
Become(&TKafkaFindCoordinatorActor::StateWork);
30+
SendResponseOkAndDie(Context->Config.GetPublicHost(), Context->Config.GetListeningPort(), ctx.SelfID.NodeId(), ctx);
3231
}
3332

3433
void TKafkaFindCoordinatorActor::SendResponseOkAndDie(const TString& host, i32 port, ui64 nodeId, const NActors::TActorContext& ctx) {
@@ -73,21 +72,8 @@ void TKafkaFindCoordinatorActor::SendResponseFailAndDie(EKafkaErrors error, cons
7372
}
7473

7574
response->ErrorCode = error;
76-
75+
7776
Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode)));
7877
Die(ctx);
7978
}
80-
81-
void TKafkaFindCoordinatorActor::Handle(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, const NActors::TActorContext& ctx) {
82-
auto iter = ev->Get()->NodeIdsMapping->find(ctx.SelfID.NodeId());
83-
Y_ABORT_UNLESS(!iter.IsEnd());
84-
85-
auto host = (*ev->Get()->Nodes)[iter->second].Host;
86-
if (host.StartsWith(UnderlayPrefix)) {
87-
host = host.substr(sizeof(UnderlayPrefix) - 1);
88-
}
89-
KAFKA_LOG_D("FIND_COORDINATOR incoming TEvGetAllNodesInfoResponse. Host#: " << host);
90-
SendResponseOkAndDie(host, Context->Config.GetListeningPort(), ctx.SelfID.NodeId(), ctx);
91-
}
92-
9379
} // NKafka

ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.h

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,8 @@ class TKafkaFindCoordinatorActor: public NActors::TActorBootstrapped<TKafkaFindC
1616
}
1717

1818
void Bootstrap(const NActors::TActorContext& ctx);
19-
20-
private:
21-
STATEFN(StateWork) {
22-
switch (ev->GetTypeRewrite()) {
23-
HFunc(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse, Handle);
24-
}
25-
}
2619

27-
void Handle(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, const NActors::TActorContext& ctx);
20+
private:
2821

2922
void SendResponseOkAndDie(const TString& host, i32 port, ui64 nodeId, const NActors::TActorContext& ctx);
3023
void SendResponseFailAndDie(EKafkaErrors error, const TString& message, const NActors::TActorContext& ctx);

ydb/core/protos/config.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2504,6 +2504,7 @@ message TKafkaProxyConfig {
25042504
optional bool AutoCreateTopicsEnable = 13 [default = false];
25052505
optional uint32 TopicCreationDefaultPartitions = 14 [default = 1];
25062506
optional bool AutoCreateConsumersEnable = 15 [default = true];
2507+
optional string PublicHost = 16;
25072508
}
25082509

25092510
message TAwsCompatibilityConfig {

0 commit comments

Comments
 (0)