diff --git a/pkg/common/utils/mysql/mysql.go b/pkg/common/utils/mysql/mysql.go index 41ba5a0f..0ed7941b 100644 --- a/pkg/common/utils/mysql/mysql.go +++ b/pkg/common/utils/mysql/mysql.go @@ -18,14 +18,18 @@ package mysql import ( - "database/sql" - "encoding/json" - "fmt" - "os" - - _ "github.com/go-sql-driver/mysql" - "github.com/jmoiron/sqlx" - "k8s.io/klog/v2" + "crypto/tls" + "crypto/x509" + "database/sql" + "encoding/json" + "errors" + "fmt" + + "github.com/go-sql-driver/mysql" + _ "github.com/go-sql-driver/mysql" + "github.com/jmoiron/sqlx" + corev1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" ) const ( @@ -40,6 +44,12 @@ type DBConfig struct { Database string } +type TLSConfig struct { + CAFileName string + ClientCertFileName string + ClientKeyFileName string +} + func NewDBConfig() DBConfig { return DBConfig{ Database: "mysql", @@ -50,11 +60,35 @@ type DB struct { *sqlx.DB } -func NewDorisSqlDB(cfg DBConfig) (*DB, error) { - if os.Getenv("DEBUG") == "true" { - cfg.Host = "10.152.183.86" - } +func NewDorisSqlDB(cfg DBConfig, tlsConfig *TLSConfig, secret *corev1.Secret) (*DB, error) { dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.Database) + rootCertPool := x509.NewCertPool() + + if tlsConfig != nil && secret != nil { + ca := secret.Data[tlsConfig.CAFileName] + clientCert := secret.Data[tlsConfig.ClientCertFileName] + clientKey := secret.Data[tlsConfig.ClientKeyFileName] + if ok := rootCertPool.AppendCertsFromPEM(ca); !ok { + klog.Errorf("NewDorisSqlDB append cert from pem failed") + return nil, errors.New("NewDorisSqlDB append cert from pem failed") + } + clientCerts := make([]tls.Certificate, 0, 1) + cCert, err := tls.X509KeyPair(clientCert, clientKey) + if err != nil { + return nil, errors.New("NewDorisSqlDB load x509 key pair failed," + err.Error()) + } + + clientCerts = append(clientCerts, cCert) + registerKey := secret.Namespace + "-" + secret.Name + if err = mysql.RegisterTLSConfig(registerKey, &tls.Config{ + RootCAs: rootCertPool, + Certificates: clientCerts, + }); err != nil { + return nil, errors.New("NewDorisSqlDB register tls config failed," + err.Error()) + } + dsn = dsn + "?tls=" + registerKey + } + db, err := sqlx.Open("mysql", dsn) if err != nil { klog.Errorf("NewDorisSqlDB sqlx.Open failed open doris sql client connection, err: %s \n", err.Error()) @@ -68,8 +102,8 @@ func NewDorisSqlDB(cfg DBConfig) (*DB, error) { return &DB{db}, nil } -func NewDorisMasterSqlDB(dbConf DBConfig) (*DB, error) { - loadBalanceDBClient, err := NewDorisSqlDB(dbConf) +func NewDorisMasterSqlDB(dbConf DBConfig, tlsConfig *TLSConfig, secret *corev1.Secret) (*DB, error) { + loadBalanceDBClient, err := NewDorisSqlDB(dbConf, tlsConfig, secret) if err != nil { klog.Errorf("NewDorisMasterSqlDB failed, get fe node connection err:%s", err.Error()) return nil, err @@ -92,7 +126,7 @@ func NewDorisMasterSqlDB(dbConf DBConfig) (*DB, error) { Host: master.Host, Port: dbConf.Port, Database: "mysql", - }) + }, tlsConfig, secret) if err != nil { klog.Errorf("NewDorisMasterSqlDB failed, get fe master connection err:%s", err.Error()) return nil, err diff --git a/pkg/common/utils/resource/configmap.go b/pkg/common/utils/resource/configmap.go index c29f76ac..ace35fae 100644 --- a/pkg/common/utils/resource/configmap.go +++ b/pkg/common/utils/resource/configmap.go @@ -20,11 +20,12 @@ package resource import ( "bytes" "errors" + "os" + dorisv1 "github.com/apache/doris-operator/api/doris/v1" "github.com/spf13/viper" corev1 "k8s.io/api/core/v1" "k8s.io/klog/v2" - "os" ) // the fe ports key @@ -46,13 +47,17 @@ const ( // the default ResolveKey const ( - FE_RESOLVEKEY = "fe.conf" - BE_RESOLVEKEY = "be.conf" - CN_RESOLVEKEY = "be.conf" - BROKER_RESOLVEKEY = "apache_hdfs_broker.conf" - MS_RESOLVEKEY = "doris_cloud.conf" - DefaultMsToken = "greedisgood9999" - DefaultMsTokenKey = "http_token" + FE_RESOLVEKEY = "fe.conf" + BE_RESOLVEKEY = "be.conf" + CN_RESOLVEKEY = "be.conf" + BROKER_RESOLVEKEY = "apache_hdfs_broker.conf" + MS_RESOLVEKEY = "doris_cloud.conf" + DefaultMsToken = "greedisgood9999" + DefaultMsTokenKey = "http_token" + ENABLE_TLS_KEY = "enable_tls" + TLS_CERTIFICATE_PATH_KEY = "tls_certificate_path" + TLS_PRIVATE_KEY_PATH_KEY = "tls_private_key_path" + TLS_CA_CERTIFICATE_PATH_KEY = "tls_ca_certificate_path" ) const ARROW_FLIGHT_SQL_PORT = "arrow_flight_sql_port" diff --git a/pkg/common/utils/resource/convert.go b/pkg/common/utils/resource/convert.go index 27abd334..2d5d668d 100644 --- a/pkg/common/utils/resource/convert.go +++ b/pkg/common/utils/resource/convert.go @@ -41,3 +41,11 @@ func GetTerminationGracePeriodSeconds(config map[string]interface{}) int64 { return 0 } + +func GetString(config map[string]interface{}, key string) string { + if v, ok := config[key]; ok { + return v.(string) + } + + return "" +} diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go index 0faf94fd..f3e642b9 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go @@ -167,14 +167,28 @@ func (dcgs *DisaggregatedComputeGroupsController) computeGroupSync(ctx context.C // reconcileStatefulset return bool means reconcile print error message. func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx context.Context, st *appv1.StatefulSet, cluster *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) (*sc.Event, error) { - var est appv1.StatefulSet - if err := dcgs.K8sclient.Get(ctx, types.NamespacedName{Namespace: st.Namespace, Name: st.Name}, &est); apierrors.IsNotFound(err) { + //use new default value before apply new statefulset, when creating and apply spec change. + ndf := func(st *appv1.StatefulSet, est *appv1.StatefulSet) { + dcgs.useNewDefaultValuesInStatefulset(st) + } + + var est appv1.StatefulSet + if err := dcgs.K8sclient.Get(ctx, types.NamespacedName{Namespace: st.Namespace, Name: st.Name}, &est); apierrors.IsNotFound(err) { // add downlaodAPI volume Mounts dcgs.DisaggregatedSubDefaultController.AddDownwardAPI(st) - if err = k8s.CreateClientObject(ctx, dcgs.K8sclient, st); err != nil { - klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset create statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error()) - return &sc.Event{Type: sc.EventWarning, Reason: sc.CGCreateResourceFailed, Message: err.Error()}, err - } + //if err = k8s.CreateClientObject(ctx, dcgs.K8sclient, st); err != nil { + // klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset create statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error()) + // return &sc.Event{Type: sc.EventWarning, Reason: sc.CGCreateResourceFailed, Message: err.Error()}, err + //} + + //use apply replace create, if use create the default image not replace with be image and annotation for equal not assign. + if err = k8s.ApplyStatefulSet(ctx, dcgs.K8sclient, st, func(st, est *appv1.StatefulSet) bool { + //creating use the function to assign equal annotation. + return resource.StatefulsetDeepEqualWithKey(st ,est, dv1.DisaggregatedSpecHashValueAnnotation, false) + }, ndf); err != nil { + klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset create statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error()) + return &sc.Event{Type: sc.EventWarning, Reason: sc.CGCreateResourceFailed, Message: err.Error()}, err + } return nil, nil } else if err != nil { @@ -191,10 +205,7 @@ func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte return nil, nil } - //use new default value before apply new statefulset - ndf := func(st *appv1.StatefulSet, est *appv1.StatefulSet) { - dcgs.useNewDefaultValuesInStatefulset(st) - } + if err := k8s.ApplyStatefulSet(ctx, dcgs.K8sclient, st, func(st, est *appv1.StatefulSet) bool { //store annotations "doris.disaggregated.cluster/generation={generation}" on statefulset //store annotations "doris.disaggregated.cluster/update-{uniqueid}=true/false" on DorisDisaggregatedCluster @@ -605,7 +616,10 @@ func(dcgs *DisaggregatedComputeGroupsController) recordComputeGroupIds(ddc *dv1. cfg.Host = host cfg.Port = strconv.FormatInt(int64(queryPort), 10) - db,err := mysql.NewDorisSqlDB(cfg) + tlsConfig, secretName := dcgs.DisaggregatedSubDefaultController.FindSecretTLSConfig(confMap, ddc) + secret, _ := k8s.GetSecret(context.Background(), dcgs.K8sclient, ddc.Namespace, secretName) + + db, err := mysql.NewDorisSqlDB(cfg, tlsConfig, secret) if err != nil { klog.Errorf("DisaggregatedComputeGroupsController recordComputeGroupIds new doris client failed,err=%s", err.Error()) return err diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go index 8fba7cc1..907d39e2 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go @@ -19,13 +19,15 @@ package computegroups import ( "context" + "strconv" + "strings" + dv1 "github.com/apache/doris-operator/api/disaggregated/v1" + "github.com/apache/doris-operator/pkg/common/utils/k8s" "github.com/apache/doris-operator/pkg/common/utils/mysql" "github.com/apache/doris-operator/pkg/common/utils/resource" appv1 "k8s.io/api/apps/v1" "k8s.io/klog/v2" - "strconv" - "strings" ) func (dcgs *DisaggregatedComputeGroupsController) preApplyStatefulSet(ctx context.Context, st, est *appv1.StatefulSet, cluster *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) error { @@ -178,8 +180,11 @@ func (dcgs *DisaggregatedComputeGroupsController) getMasterSqlClient(ctx context Port: strconv.FormatInt(int64(queryPort), 10), Database: "mysql", } + tlsConfig, secretName := dcgs.DisaggregatedSubDefaultController.FindSecretTLSConfig(confMap, cluster) + secret, _ := k8s.GetSecret(context.Background(), dcgs.K8sclient, cluster.Namespace, secretName) + // Connect to the master and run the SQL statement of system admin, because it is not excluded that the user can shrink be and fe at the same time - masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf) + masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf, tlsConfig, secret) if err != nil { klog.Errorf("getMasterSqlClient NewDorisMasterSqlDB failed for ddc %s namespace %s, get fe node connection err:%s", cluster.Namespace, cluster.Name, err.Error()) return nil, err @@ -226,7 +231,7 @@ func getScaledOutBENode( return dropNodes, nil } -//if in decommission, skip apply statefulset. +// if in decommission, skip apply statefulset. func skipApplyStatefulset(ddc *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) bool { var cgStatus *dv1.ComputeGroupStatus uniqueId := cg.UniqueId diff --git a/pkg/controller/sub_controller/disaggregated_cluster/disaggregated_fe/controller.go b/pkg/controller/sub_controller/disaggregated_cluster/disaggregated_fe/controller.go index 46883272..0af5b9d2 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/disaggregated_fe/controller.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/disaggregated_fe/controller.go @@ -21,6 +21,9 @@ import ( "context" "errors" "fmt" + "strconv" + "strings" + "github.com/apache/doris-operator/api/disaggregated/v1" "github.com/apache/doris-operator/pkg/common/utils/k8s" "github.com/apache/doris-operator/pkg/common/utils/mysql" @@ -33,8 +36,6 @@ import ( "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "strconv" - "strings" ) var _ sc.DisaggregatedSubController = &DisaggregatedFEController{} @@ -350,6 +351,8 @@ func (dfc *DisaggregatedFEController) dropFEBySQLClient(ctx context.Context, k8s host := cluster.GetFEVIPAddresss() confMap := dfc.GetConfigValuesFromConfigMaps(cluster.Namespace, resource.FE_RESOLVEKEY, cluster.Spec.FeSpec.ConfigMaps) queryPort := resource.GetPort(confMap, resource.QUERY_PORT) + tlsConfig, secretName := dfc.DisaggregatedSubDefaultController.FindSecretTLSConfig(confMap, cluster) + secret, _ := k8s.GetSecret(context.Background(), dfc.K8sclient, cluster.Namespace, secretName) // connect to doris sql to get master node // It may not be the master, or even the node that needs to be deleted, causing the deletion SQL to fail. @@ -360,7 +363,7 @@ func (dfc *DisaggregatedFEController) dropFEBySQLClient(ctx context.Context, k8s Port: strconv.FormatInt(int64(queryPort), 10), Database: "mysql", } - masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf) + masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf, tlsConfig, secret) if err != nil { klog.Errorf("NewDorisMasterSqlDB failed, get fe node connection err:%s", err.Error()) return err diff --git a/pkg/controller/sub_controller/disaggregated_subcontroller.go b/pkg/controller/sub_controller/disaggregated_subcontroller.go index 8ced89dc..ee3928fc 100644 --- a/pkg/controller/sub_controller/disaggregated_subcontroller.go +++ b/pkg/controller/sub_controller/disaggregated_subcontroller.go @@ -22,9 +22,16 @@ import ( "context" "encoding/json" "fmt" + "os" + "path" + "path/filepath" + "strconv" + "strings" + "github.com/apache/doris-operator/api/disaggregated/v1" "github.com/apache/doris-operator/pkg/common/utils/k8s" "github.com/apache/doris-operator/pkg/common/utils/metadata" + "github.com/apache/doris-operator/pkg/common/utils/mysql" "github.com/apache/doris-operator/pkg/common/utils/resource" "github.com/apache/doris-operator/pkg/common/utils/set" "github.com/spf13/viper" @@ -34,10 +41,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" - "os" "sigs.k8s.io/controller-runtime/pkg/client" - "strconv" - "strings" ) const ( @@ -768,3 +772,34 @@ func (d *DisaggregatedSubDefaultController) getFEMetaPath(confMap map[string]int } return v.(string) } + +func (d *DisaggregatedSubDefaultController) FindSecretTLSConfig(feConfMap map[string]interface{}, ddc *v1.DorisDisaggregatedCluster) (*mysql.TLSConfig, string /*secret name*/) { + enableTLS := resource.GetString(feConfMap, resource.ENABLE_TLS_KEY) + if enableTLS == "" { + return nil, "" + } + + caCertFile := resource.GetString(feConfMap, resource.TLS_CA_CERTIFICATE_PATH_KEY) + clientCertFile := resource.GetString(feConfMap, resource.TLS_CERTIFICATE_PATH_KEY) + clientKeyFile := resource.GetString(feConfMap, resource.TLS_PRIVATE_KEY_PATH_KEY) + caFileName := path.Base(caCertFile) + clientCertFileName := path.Base(clientCertFile) + clientKeyFileName := path.Base(clientKeyFile) + + caCertDir := filepath.Dir(caCertFile) + secretName := "" + for _, sn := range ddc.Spec.FeSpec.Secrets { + if sn.MountPath == caCertDir { + secretName = sn.SecretName + break + } + } + + tlsConfig := &mysql.TLSConfig{ + CAFileName: caFileName, + ClientCertFileName: clientCertFileName, + ClientKeyFileName: clientKeyFileName, + } + + return tlsConfig, secretName +} diff --git a/pkg/controller/sub_controller/fe/prepare_modify.go b/pkg/controller/sub_controller/fe/prepare_modify.go index 2afee13c..5b835e84 100644 --- a/pkg/controller/sub_controller/fe/prepare_modify.go +++ b/pkg/controller/sub_controller/fe/prepare_modify.go @@ -96,6 +96,7 @@ func (fc *Controller) safeScaleDown(cluster *v1.DorisCluster, ost *appv1.Statefu return } + // dropObserverBySqlClient handles doris'SQL(drop frontend) through the MySQL client when dealing with scale in observer // targetDCR is new dcr func (fc *Controller) dropObserverBySqlClient(ctx context.Context, k8sclient client.Client, targetDCR *v1.DorisCluster) error { @@ -118,7 +119,7 @@ func (fc *Controller) dropObserverBySqlClient(ctx context.Context, k8sclient cli Port: strconv.FormatInt(int64(queryPort), 10), Database: "mysql", } - masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf) + masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf, nil, nil) if err != nil { klog.Errorf("NewDorisMasterSqlDB failed, get fe node connection err:%s", err.Error()) return err