diff --git a/pkg/cmd/proxy/main.go b/pkg/cmd/proxy/main.go index 1a205c7..bf17aca 100644 --- a/pkg/cmd/proxy/main.go +++ b/pkg/cmd/proxy/main.go @@ -126,7 +126,7 @@ func main() { } { - go tproxy.NewTProxy(*proxyIptablePort, faultInjectionMgr).Start() + go tproxy.NewTProxy(*proxyIptablePort, faultInjectionMgr, breakerMgr).Start() } serveHTTP(ctx, readyHandler) diff --git a/pkg/proxy/apiserver/handler.go b/pkg/proxy/apiserver/handler.go index f7722e9..e2768bb 100644 --- a/pkg/proxy/apiserver/handler.go +++ b/pkg/proxy/apiserver/handler.go @@ -51,7 +51,6 @@ import ( var ( upgradeSubresources = sets.NewString("exec", "attach") enableIpTable = os.Getenv(constants.EnvIPTable) == "true" - enableWebhookProxy = os.Getenv(constants.EnvEnableWebHookProxy) == "true" disableCircuitBreaker = os.Getenv(constants.EnvDisableCircuitBreaker) == "true" disableFaultInjection = os.Getenv(constants.EnvDisableCircuitBreaker) == "true" @@ -141,15 +140,9 @@ type handler struct { electionHandler leaderelection.Handler } -func getReqInfoStr(r *apirequest.RequestInfo) string { - return fmt.Sprintf("RequestInfo: { Path: %s, APIGroup: %s, Resource: %s, Subresource: %s, Verb: %s, Namespace: %s, Name: %s, APIVersion: %s }", r.Path, r.APIGroup, r.Resource, r.Subresource, r.Verb, r.Namespace, r.Name, r.APIVersion) -} - func (h *handler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { startTime := time.Now() requestInfo, ok := apirequest.RequestInfoFrom(r.Context()) - klog.Infof("handle http req %s", r.URL.String()) - klog.Infof(getReqInfoStr(requestInfo)) if !ok { klog.Errorf("%s %s %s, no request info in context", r.Method, r.Header.Get("Content-Type"), r.URL) http.Error(rw, "no request info in context", http.StatusBadRequest) @@ -216,7 +209,6 @@ func (h *handler) getURL(r *http.Request) *url.URL { u, _ := url.Parse(fmt.Sprintf("https://%s", r.Host)) if !enableIpTable { u, _ = url.Parse(fmt.Sprintf(h.cfg.Host)) - klog.Infof("disable IPTABLE, proxy apiServer with real host %s", u.String()) r.Host = "" } return u diff --git a/pkg/proxy/faultinjection/manager.go b/pkg/proxy/faultinjection/manager.go index 200f2a7..e8c3faf 100644 --- a/pkg/proxy/faultinjection/manager.go +++ b/pkg/proxy/faultinjection/manager.go @@ -278,7 +278,7 @@ func (m *manager) doFaultInjection(faultInjections []*ctrlmeshproto.HTTPFaultInj if isInpercentRange(faultInjections[idx].Delay.Percent) { delay := faultInjections[idx].Delay.GetFixedDelay() delayDuration := delay.AsDuration() - fmt.Println("Delaying for ", delayDuration) + logger.Info("Delaying time ", "for", delayDuration) time.Sleep(delayDuration) } } diff --git a/pkg/proxy/http/http_proxy.go b/pkg/proxy/http/http_proxy.go index 9e7ee21..c1be1ca 100644 --- a/pkg/proxy/http/http_proxy.go +++ b/pkg/proxy/http/http_proxy.go @@ -19,6 +19,7 @@ package http import ( "encoding/json" "fmt" + "os" "net/http" "net/url" @@ -27,14 +28,17 @@ import ( "k8s.io/klog/v2" logf "sigs.k8s.io/controller-runtime/pkg/log" + "github.com/KusionStack/controller-mesh/pkg/apis/ctrlmesh/constants" meshhttp "github.com/KusionStack/controller-mesh/pkg/apis/ctrlmesh/http" + "github.com/KusionStack/controller-mesh/pkg/proxy/circuitbreaker" "github.com/KusionStack/controller-mesh/pkg/proxy/faultinjection" "github.com/KusionStack/controller-mesh/pkg/utils" utilshttp "github.com/KusionStack/controller-mesh/pkg/utils/http" ) var ( - logger = logf.Log.WithName("http-proxy") + enableRestBreaker = os.Getenv(constants.EnvEnableRestCircuitBreaker) == "true" + logger = logf.Log.WithName("http-proxy") ) type ITProxy interface { @@ -42,14 +46,16 @@ type ITProxy interface { } type tproxy struct { - port int - FaultInjector faultinjection.ManagerInterface + port int + FaultInjector faultinjection.ManagerInterface + CircuitInjector circuitbreaker.ManagerInterface } -func NewTProxy(port int, faultInjector faultinjection.ManagerInterface) ITProxy { +func NewTProxy(port int, faultInjector faultinjection.ManagerInterface, circuitInjector circuitbreaker.ManagerInterface) ITProxy { return &tproxy{ - port: port, - FaultInjector: faultInjector, + port: port, + FaultInjector: faultInjector, + CircuitInjector: circuitInjector, } } @@ -59,7 +65,7 @@ func (t *tproxy) Start() { Addr: fmt.Sprintf(":%d", t.port), Handler: http.HandlerFunc(t.handleHTTP), } - logger.Info("%s", server.ListenAndServe()) + klog.Infof("%s", server.ListenAndServe()) } func (t *tproxy) handleHTTP(resp http.ResponseWriter, req *http.Request) { @@ -76,9 +82,10 @@ func (t *tproxy) handleHTTP(resp http.ResponseWriter, req *http.Request) { return } realEndPointUrl = epUrl - logger.Info("receive", "proxy-host", realEndPointUrl.Host, "proxy-method", req.Method, "Mesh-Real-Endpoint", realEp) + klog.Infof("receive, proxy-host: %s, proxy-method: %s, Mesh-Real-Endpoint: %s", realEndPointUrl.Host, req.Method, realEp) } - logger.Info("handel http request", "url", realEndPointUrl.String()) + klog.Infof("handel http request, url: %s ", realEndPointUrl.String()) + // faultinjection result := t.FaultInjector.FaultInjectionRest(req.Header.Get(meshhttp.HeaderMeshRealEndpoint), req.Method) if result.Abort { apiErr := utils.HttpToAPIError(int(result.ErrCode), req.Method, result.Message) @@ -87,10 +94,35 @@ func (t *tproxy) handleHTTP(resp http.ResponseWriter, req *http.Request) { if err := json.NewEncoder(resp).Encode(apiErr); err != nil { http.Error(resp, fmt.Sprintf("fail to inject fault %v", err), http.StatusInternalServerError) } - logger.Info("faultInjection rule", "rule", fmt.Sprintf("fault injection, %s, %s,%d", result.Reason, result.Message, result.ErrCode)) + klog.Infof("faultInjection rule, rule: %s", fmt.Sprintf("fault injection, %s, %s,%d", result.Reason, result.Message, result.ErrCode)) return } + // circuitbreaker + if enableRestBreaker { + // check request is in the whitelist + klog.Infof("start checktrafficrule %s", realEndPointUrl.Host) + result := t.CircuitInjector.ValidateTrafficIntercept(realEndPointUrl.Host, req.Method) + if !result.Allowed { + klog.Infof("ErrorTProxy: %s %s ValidateTrafficIntercept NOPASSED ,checkresult:\t%s", realEndPointUrl.Host, req.Method, result.Reason) + http.Error(resp, fmt.Sprintf("Forbidden by ValidateTrafficIntercept breaker, %s, %s", result.Message, result.Reason), http.StatusForbidden) + return + } + } + + // ValidateTrafficIntercept check pass or enableRestBreaker is false run http proxy + klog.Infof("TProxy: %s %s ValidateTrafficIntercept check PASSED or enableRestBreaker is false", realEndPointUrl.Host, req.Method) + + // ValidateRest check + klog.Infof("start ValidateRest checkrule %s %s", realEndPointUrl.Host, req.Method) + validateresult := t.CircuitInjector.ValidateRest(req.Header.Get("Mesh-Real-Endpoint"), req.Method) + if !validateresult.Allowed { + klog.Infof("ErrorTProxy: %s %s ValidateRest NOPASSED ,checkresult:%t, validateresultReason:%s", req.Header.Get("Mesh-Real-Endpoint"), req.Method, validateresult.Allowed, validateresult.Reason) + http.Error(resp, fmt.Sprintf("Forbidden by circuit ValidateRest breaker, %s, %s", validateresult.Message, validateresult.Reason), http.StatusForbidden) + return + } + klog.Infof("TProxy: %s %s ValidateRest check PASSED", realEndPointUrl.Host, req.Method) + // modify request director := func(target *http.Request) { target.Header.Set("Pass-Via-Go-TProxy", "1") diff --git a/pkg/proxy/http/http_proxy_test.go b/pkg/proxy/http/http_proxy_test.go index 2219735..0b5c205 100644 --- a/pkg/proxy/http/http_proxy_test.go +++ b/pkg/proxy/http/http_proxy_test.go @@ -26,6 +26,7 @@ import ( meshhttp "github.com/KusionStack/controller-mesh/pkg/apis/ctrlmesh/http" ctrlmeshproto "github.com/KusionStack/controller-mesh/pkg/apis/ctrlmesh/proto" + "github.com/KusionStack/controller-mesh/pkg/proxy/circuitbreaker" "github.com/KusionStack/controller-mesh/pkg/proxy/faultinjection" ) @@ -52,6 +53,7 @@ func TestTProxy(t *testing.T) { func StartProxy() { faultInjectionMgr := faultinjection.NewManager(context.TODO()) + circuitInjectionMgr := circuitbreaker.NewManager(context.TODO()) _, err := faultInjectionMgr.Sync(&ctrlmeshproto.FaultInjection{ Option: ctrlmeshproto.FaultInjection_UPDATE, ConfigHash: "123", @@ -119,6 +121,6 @@ func StartProxy() { }, }) utilruntime.Must(err) - tProxy := NewTProxy(15002, faultInjectionMgr) + tProxy := NewTProxy(15002, faultInjectionMgr, circuitInjectionMgr) tProxy.Start() }