Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 49 additions & 15 deletions pkg/common/utils/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
package mysql

import (
"database/sql"
"encoding/json"
"fmt"
"os"

_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"k8s.io/klog/v2"
"crypto/tls"
"crypto/x509"
"database/sql"
"encoding/json"
"errors"
"fmt"

"github.com/go-sql-driver/mysql"
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
)

const (
Expand All @@ -40,6 +44,12 @@ type DBConfig struct {
Database string
}

type TLSConfig struct {
CAFileName string
ClientCertFileName string
ClientKeyFileName string
}

func NewDBConfig() DBConfig {
return DBConfig{
Database: "mysql",
Expand All @@ -50,11 +60,35 @@ type DB struct {
*sqlx.DB
}

func NewDorisSqlDB(cfg DBConfig) (*DB, error) {
if os.Getenv("DEBUG") == "true" {
cfg.Host = "10.152.183.86"
}
func NewDorisSqlDB(cfg DBConfig, tlsConfig *TLSConfig, secret *corev1.Secret) (*DB, error) {
dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.Database)
rootCertPool := x509.NewCertPool()

if tlsConfig != nil && secret != nil {
ca := secret.Data[tlsConfig.CAFileName]
clientCert := secret.Data[tlsConfig.ClientCertFileName]
clientKey := secret.Data[tlsConfig.ClientKeyFileName]
if ok := rootCertPool.AppendCertsFromPEM(ca); !ok {
klog.Errorf("NewDorisSqlDB append cert from pem failed")
return nil, errors.New("NewDorisSqlDB append cert from pem failed")
}
clientCerts := make([]tls.Certificate, 0, 1)
cCert, err := tls.X509KeyPair(clientCert, clientKey)
if err != nil {
return nil, errors.New("NewDorisSqlDB load x509 key pair failed," + err.Error())
}

clientCerts = append(clientCerts, cCert)
registerKey := secret.Namespace + "-" + secret.Name
if err = mysql.RegisterTLSConfig(registerKey, &tls.Config{
RootCAs: rootCertPool,
Certificates: clientCerts,
}); err != nil {
return nil, errors.New("NewDorisSqlDB register tls config failed," + err.Error())
}
dsn = dsn + "?tls=" + registerKey
}

db, err := sqlx.Open("mysql", dsn)
if err != nil {
klog.Errorf("NewDorisSqlDB sqlx.Open failed open doris sql client connection, err: %s \n", err.Error())
Expand All @@ -68,8 +102,8 @@ func NewDorisSqlDB(cfg DBConfig) (*DB, error) {
return &DB{db}, nil
}

func NewDorisMasterSqlDB(dbConf DBConfig) (*DB, error) {
loadBalanceDBClient, err := NewDorisSqlDB(dbConf)
func NewDorisMasterSqlDB(dbConf DBConfig, tlsConfig *TLSConfig, secret *corev1.Secret) (*DB, error) {
loadBalanceDBClient, err := NewDorisSqlDB(dbConf, tlsConfig, secret)
if err != nil {
klog.Errorf("NewDorisMasterSqlDB failed, get fe node connection err:%s", err.Error())
return nil, err
Expand All @@ -92,7 +126,7 @@ func NewDorisMasterSqlDB(dbConf DBConfig) (*DB, error) {
Host: master.Host,
Port: dbConf.Port,
Database: "mysql",
})
}, tlsConfig, secret)
if err != nil {
klog.Errorf("NewDorisMasterSqlDB failed, get fe master connection err:%s", err.Error())
return nil, err
Expand Down
21 changes: 13 additions & 8 deletions pkg/common/utils/resource/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ package resource
import (
"bytes"
"errors"
"os"

dorisv1 "github.com/apache/doris-operator/api/doris/v1"
"github.com/spf13/viper"
corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"os"
)

// the fe ports key
Expand All @@ -46,13 +47,17 @@ const (

// the default ResolveKey
const (
FE_RESOLVEKEY = "fe.conf"
BE_RESOLVEKEY = "be.conf"
CN_RESOLVEKEY = "be.conf"
BROKER_RESOLVEKEY = "apache_hdfs_broker.conf"
MS_RESOLVEKEY = "doris_cloud.conf"
DefaultMsToken = "greedisgood9999"
DefaultMsTokenKey = "http_token"
FE_RESOLVEKEY = "fe.conf"
BE_RESOLVEKEY = "be.conf"
CN_RESOLVEKEY = "be.conf"
BROKER_RESOLVEKEY = "apache_hdfs_broker.conf"
MS_RESOLVEKEY = "doris_cloud.conf"
DefaultMsToken = "greedisgood9999"
DefaultMsTokenKey = "http_token"
ENABLE_TLS_KEY = "enable_tls"
TLS_CERTIFICATE_PATH_KEY = "tls_certificate_path"
TLS_PRIVATE_KEY_PATH_KEY = "tls_private_key_path"
TLS_CA_CERTIFICATE_PATH_KEY = "tls_ca_certificate_path"
)

const ARROW_FLIGHT_SQL_PORT = "arrow_flight_sql_port"
Expand Down
8 changes: 8 additions & 0 deletions pkg/common/utils/resource/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,11 @@ func GetTerminationGracePeriodSeconds(config map[string]interface{}) int64 {

return 0
}

func GetString(config map[string]interface{}, key string) string {
if v, ok := config[key]; ok {
return v.(string)
}

return ""
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,28 @@ func (dcgs *DisaggregatedComputeGroupsController) computeGroupSync(ctx context.C

// reconcileStatefulset return bool means reconcile print error message.
func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx context.Context, st *appv1.StatefulSet, cluster *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) (*sc.Event, error) {
var est appv1.StatefulSet
if err := dcgs.K8sclient.Get(ctx, types.NamespacedName{Namespace: st.Namespace, Name: st.Name}, &est); apierrors.IsNotFound(err) {
//use new default value before apply new statefulset, when creating and apply spec change.
ndf := func(st *appv1.StatefulSet, est *appv1.StatefulSet) {
dcgs.useNewDefaultValuesInStatefulset(st)
}

var est appv1.StatefulSet
if err := dcgs.K8sclient.Get(ctx, types.NamespacedName{Namespace: st.Namespace, Name: st.Name}, &est); apierrors.IsNotFound(err) {
// add downlaodAPI volume Mounts
dcgs.DisaggregatedSubDefaultController.AddDownwardAPI(st)
if err = k8s.CreateClientObject(ctx, dcgs.K8sclient, st); err != nil {
klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset create statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
return &sc.Event{Type: sc.EventWarning, Reason: sc.CGCreateResourceFailed, Message: err.Error()}, err
}
//if err = k8s.CreateClientObject(ctx, dcgs.K8sclient, st); err != nil {
// klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset create statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
// return &sc.Event{Type: sc.EventWarning, Reason: sc.CGCreateResourceFailed, Message: err.Error()}, err
//}

//use apply replace create, if use create the default image not replace with be image and annotation for equal not assign.
if err = k8s.ApplyStatefulSet(ctx, dcgs.K8sclient, st, func(st, est *appv1.StatefulSet) bool {
//creating use the function to assign equal annotation.
return resource.StatefulsetDeepEqualWithKey(st ,est, dv1.DisaggregatedSpecHashValueAnnotation, false)
}, ndf); err != nil {
klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset create statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
return &sc.Event{Type: sc.EventWarning, Reason: sc.CGCreateResourceFailed, Message: err.Error()}, err
}

return nil, nil
} else if err != nil {
Expand All @@ -191,10 +205,7 @@ func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte
return nil, nil
}

//use new default value before apply new statefulset
ndf := func(st *appv1.StatefulSet, est *appv1.StatefulSet) {
dcgs.useNewDefaultValuesInStatefulset(st)
}

if err := k8s.ApplyStatefulSet(ctx, dcgs.K8sclient, st, func(st, est *appv1.StatefulSet) bool {
//store annotations "doris.disaggregated.cluster/generation={generation}" on statefulset
//store annotations "doris.disaggregated.cluster/update-{uniqueid}=true/false" on DorisDisaggregatedCluster
Expand Down Expand Up @@ -605,7 +616,10 @@ func(dcgs *DisaggregatedComputeGroupsController) recordComputeGroupIds(ddc *dv1.
cfg.Host = host
cfg.Port = strconv.FormatInt(int64(queryPort), 10)

db,err := mysql.NewDorisSqlDB(cfg)
tlsConfig, secretName := dcgs.DisaggregatedSubDefaultController.FindSecretTLSConfig(confMap, ddc)
secret, _ := k8s.GetSecret(context.Background(), dcgs.K8sclient, ddc.Namespace, secretName)

db, err := mysql.NewDorisSqlDB(cfg, tlsConfig, secret)
if err != nil {
klog.Errorf("DisaggregatedComputeGroupsController recordComputeGroupIds new doris client failed,err=%s", err.Error())
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package computegroups

import (
"context"
"strconv"
"strings"

dv1 "github.com/apache/doris-operator/api/disaggregated/v1"
"github.com/apache/doris-operator/pkg/common/utils/k8s"
"github.com/apache/doris-operator/pkg/common/utils/mysql"
"github.com/apache/doris-operator/pkg/common/utils/resource"
appv1 "k8s.io/api/apps/v1"
"k8s.io/klog/v2"
"strconv"
"strings"
)

func (dcgs *DisaggregatedComputeGroupsController) preApplyStatefulSet(ctx context.Context, st, est *appv1.StatefulSet, cluster *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) error {
Expand Down Expand Up @@ -178,8 +180,11 @@ func (dcgs *DisaggregatedComputeGroupsController) getMasterSqlClient(ctx context
Port: strconv.FormatInt(int64(queryPort), 10),
Database: "mysql",
}
tlsConfig, secretName := dcgs.DisaggregatedSubDefaultController.FindSecretTLSConfig(confMap, cluster)
secret, _ := k8s.GetSecret(context.Background(), dcgs.K8sclient, cluster.Namespace, secretName)

// Connect to the master and run the SQL statement of system admin, because it is not excluded that the user can shrink be and fe at the same time
masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf)
masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf, tlsConfig, secret)
if err != nil {
klog.Errorf("getMasterSqlClient NewDorisMasterSqlDB failed for ddc %s namespace %s, get fe node connection err:%s", cluster.Namespace, cluster.Name, err.Error())
return nil, err
Expand Down Expand Up @@ -226,7 +231,7 @@ func getScaledOutBENode(
return dropNodes, nil
}

//if in decommission, skip apply statefulset.
// if in decommission, skip apply statefulset.
func skipApplyStatefulset(ddc *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) bool {
var cgStatus *dv1.ComputeGroupStatus
uniqueId := cg.UniqueId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"context"
"errors"
"fmt"
"strconv"
"strings"

"github.com/apache/doris-operator/api/disaggregated/v1"
"github.com/apache/doris-operator/pkg/common/utils/k8s"
"github.com/apache/doris-operator/pkg/common/utils/mysql"
Expand All @@ -33,8 +36,6 @@ import (
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"strconv"
"strings"
)

var _ sc.DisaggregatedSubController = &DisaggregatedFEController{}
Expand Down Expand Up @@ -350,6 +351,8 @@ func (dfc *DisaggregatedFEController) dropFEBySQLClient(ctx context.Context, k8s
host := cluster.GetFEVIPAddresss()
confMap := dfc.GetConfigValuesFromConfigMaps(cluster.Namespace, resource.FE_RESOLVEKEY, cluster.Spec.FeSpec.ConfigMaps)
queryPort := resource.GetPort(confMap, resource.QUERY_PORT)
tlsConfig, secretName := dfc.DisaggregatedSubDefaultController.FindSecretTLSConfig(confMap, cluster)
secret, _ := k8s.GetSecret(context.Background(), dfc.K8sclient, cluster.Namespace, secretName)

// connect to doris sql to get master node
// It may not be the master, or even the node that needs to be deleted, causing the deletion SQL to fail.
Expand All @@ -360,7 +363,7 @@ func (dfc *DisaggregatedFEController) dropFEBySQLClient(ctx context.Context, k8s
Port: strconv.FormatInt(int64(queryPort), 10),
Database: "mysql",
}
masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf)
masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf, tlsConfig, secret)
if err != nil {
klog.Errorf("NewDorisMasterSqlDB failed, get fe node connection err:%s", err.Error())
return err
Expand Down
41 changes: 38 additions & 3 deletions pkg/controller/sub_controller/disaggregated_subcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,16 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"path"
"path/filepath"
"strconv"
"strings"

"github.com/apache/doris-operator/api/disaggregated/v1"
"github.com/apache/doris-operator/pkg/common/utils/k8s"
"github.com/apache/doris-operator/pkg/common/utils/metadata"
"github.com/apache/doris-operator/pkg/common/utils/mysql"
"github.com/apache/doris-operator/pkg/common/utils/resource"
"github.com/apache/doris-operator/pkg/common/utils/set"
"github.com/spf13/viper"
Expand All @@ -34,10 +41,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"os"
"sigs.k8s.io/controller-runtime/pkg/client"
"strconv"
"strings"
)

const (
Expand Down Expand Up @@ -768,3 +772,34 @@ func (d *DisaggregatedSubDefaultController) getFEMetaPath(confMap map[string]int
}
return v.(string)
}

func (d *DisaggregatedSubDefaultController) FindSecretTLSConfig(feConfMap map[string]interface{}, ddc *v1.DorisDisaggregatedCluster) (*mysql.TLSConfig, string /*secret name*/) {
enableTLS := resource.GetString(feConfMap, resource.ENABLE_TLS_KEY)
if enableTLS == "" {
return nil, ""
}

caCertFile := resource.GetString(feConfMap, resource.TLS_CA_CERTIFICATE_PATH_KEY)
clientCertFile := resource.GetString(feConfMap, resource.TLS_CERTIFICATE_PATH_KEY)
clientKeyFile := resource.GetString(feConfMap, resource.TLS_PRIVATE_KEY_PATH_KEY)
caFileName := path.Base(caCertFile)
clientCertFileName := path.Base(clientCertFile)
clientKeyFileName := path.Base(clientKeyFile)

caCertDir := filepath.Dir(caCertFile)
secretName := ""
for _, sn := range ddc.Spec.FeSpec.Secrets {
if sn.MountPath == caCertDir {
secretName = sn.SecretName
break
}
}

tlsConfig := &mysql.TLSConfig{
CAFileName: caFileName,
ClientCertFileName: clientCertFileName,
ClientKeyFileName: clientKeyFileName,
}

return tlsConfig, secretName
}
3 changes: 2 additions & 1 deletion pkg/controller/sub_controller/fe/prepare_modify.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (fc *Controller) safeScaleDown(cluster *v1.DorisCluster, ost *appv1.Statefu

return
}

// dropObserverBySqlClient handles doris'SQL(drop frontend) through the MySQL client when dealing with scale in observer
// targetDCR is new dcr
func (fc *Controller) dropObserverBySqlClient(ctx context.Context, k8sclient client.Client, targetDCR *v1.DorisCluster) error {
Expand All @@ -118,7 +119,7 @@ func (fc *Controller) dropObserverBySqlClient(ctx context.Context, k8sclient cli
Port: strconv.FormatInt(int64(queryPort), 10),
Database: "mysql",
}
masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf)
masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf, nil, nil)
if err != nil {
klog.Errorf("NewDorisMasterSqlDB failed, get fe node connection err:%s", err.Error())
return err
Expand Down