diff --git a/prog/app.go b/prog/app.go index d4b9ca8aab..7b517100e5 100644 --- a/prog/app.go +++ b/prog/app.go @@ -13,13 +13,12 @@ import ( "time" log "github.com/Sirupsen/logrus" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus" "github.com/tylerb/graceful" billing "github.com/weaveworks/billing-client" + "github.com/weaveworks/common/aws" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/network" "github.com/weaveworks/go-checkpoint" @@ -76,21 +75,6 @@ func router(collector app.Collector, controlRouter app.ControlRouter, pipeRouter return instrument.Wrap(router) } -func awsConfigFromURL(url *url.URL) (*aws.Config, error) { - if url.User == nil { - return nil, fmt.Errorf("Must specify username & password in URL") - } - password, _ := url.User.Password() - creds := credentials.NewStaticCredentials(url.User.Username(), password, "") - config := aws.NewConfig().WithCredentials(creds) - if strings.Contains(url.Host, ".") { - config = config.WithEndpoint(fmt.Sprintf("http://%s", url.Host)).WithRegion("dummy") - } else { - config = config.WithRegion(url.Host) - } - return config, nil -} - func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHostname string, memcacheConfig multitenant.MemcacheConfig, window time.Duration, createTables bool) (app.Collector, error) { if collectorURL == "local" { @@ -110,11 +94,11 @@ func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHo if err != nil { return nil, fmt.Errorf("Valid URL for s3 required: %v", err) } - dynamoDBConfig, err := awsConfigFromURL(parsed) + dynamoDBConfig, err := aws.ConfigFromURL(parsed) if err != nil { return nil, err } - s3Config, err := awsConfigFromURL(s3) + s3Config, err := aws.ConfigFromURL(s3) if err != nil { return nil, err } @@ -175,7 +159,7 @@ func controlRouterFactory(userIDer multitenant.UserIDer, controlRouterURL string if parsed.Scheme == "sqs" { prefix := strings.TrimPrefix(parsed.Path, "/") - sqsConfig, err := awsConfigFromURL(parsed) + sqsConfig, err := aws.ConfigFromURL(parsed) if err != nil { return nil, err } diff --git a/vendor/github.com/weaveworks/common/aws/config.go b/vendor/github.com/weaveworks/common/aws/config.go new file mode 100644 index 0000000000..dcba81fa50 --- /dev/null +++ b/vendor/github.com/weaveworks/common/aws/config.go @@ -0,0 +1,53 @@ +package aws + +import ( + "fmt" + "net" + "net/http" + "net/url" + "strings" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" +) + +// ConfigFromURL returns AWS config from given URL. It expects escaped +// AWS Access key ID & Secret Access Key to be encoded in the URL. It +// also expects region specified as a host (letting AWS generate full +// endpoint) or fully valid endpoint with dummy region assumed (e.g +// for URLs to emulated services). +func ConfigFromURL(awsURL *url.URL) (*aws.Config, error) { + if awsURL.User == nil { + return nil, fmt.Errorf("must specify escaped Access Key & Secret Access in URL") + } + + password, _ := awsURL.User.Password() + creds := credentials.NewStaticCredentials(awsURL.User.Username(), password, "") + config := aws.NewConfig(). + WithCredentials(creds). + // Use a custom http.Client with the golang defaults but also specifying + // MaxIdleConnsPerHost because of a bug in golang https://github.com/golang/go/issues/13801 + // where MaxIdleConnsPerHost does not work as expected. + WithHTTPClient(&http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + DualStack: true, + }).DialContext, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + MaxIdleConnsPerHost: 100, + TLSHandshakeTimeout: 3 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + }, + }) + if strings.Contains(awsURL.Host, ".") { + return config.WithEndpoint(fmt.Sprintf("http://%s", awsURL.Host)).WithRegion("dummy"), nil + } + + // Let AWS generate default endpoint based on region passed as a host in URL. + return config.WithRegion(awsURL.Host), nil +} diff --git a/vendor/github.com/weaveworks/common/http/client/client.go b/vendor/github.com/weaveworks/common/http/client/client.go new file mode 100644 index 0000000000..9762aba6de --- /dev/null +++ b/vendor/github.com/weaveworks/common/http/client/client.go @@ -0,0 +1,35 @@ +package client + +import ( + "context" + "fmt" + "net/http" + "strconv" + + "github.com/prometheus/client_golang/prometheus" + "github.com/weaveworks/common/instrument" + oldcontext "golang.org/x/net/context" +) + +// Requester executes an HTTP request. +type Requester interface { + Do(req *http.Request) (*http.Response, error) +} + +// TimeRequestHistogram performs an HTTP client request and records the duration in a histogram +func TimeRequestHistogram(ctx context.Context, operation string, metric *prometheus.HistogramVec, client Requester, request *http.Request) (*http.Response, error) { + var response *http.Response + doRequest := func(_ oldcontext.Context) error { + var err error + response, err = client.Do(request) + return err + } + toStatusCode := func(err error) string { + if err == nil { + return strconv.Itoa(response.StatusCode) + } + return "error" + } + err := instrument.TimeRequestHistogramStatus(ctx, fmt.Sprintf("%s %s", request.Method, operation), metric, toStatusCode, doRequest) + return response, err +} diff --git a/vendor/github.com/weaveworks/common/http/http.go b/vendor/github.com/weaveworks/common/http/http.go new file mode 100644 index 0000000000..d02cfda642 --- /dev/null +++ b/vendor/github.com/weaveworks/common/http/http.go @@ -0,0 +1 @@ +package http diff --git a/vendor/github.com/weaveworks/common/logging/logging.go b/vendor/github.com/weaveworks/common/logging/logging.go index 034b51aa47..c10b7fcb75 100644 --- a/vendor/github.com/weaveworks/common/logging/logging.go +++ b/vendor/github.com/weaveworks/common/logging/logging.go @@ -5,6 +5,8 @@ import ( "fmt" "os" "strings" + "sync" + "time" "golang.org/x/net/context" @@ -12,6 +14,10 @@ import ( "github.com/weaveworks/common/user" ) +const ( + defaultDedupeInterval = time.Minute +) + // Setup configures logging output to stderr, sets the log level and sets the formatter. func Setup(logLevel string) error { log.SetOutput(os.Stderr) @@ -24,6 +30,39 @@ func Setup(logLevel string) error { return nil } +// SetupDeduplication should be performed after any other logging setup. +// For all logs less severe or equal to the given log level (but still higher than the logger's configured log level), +// these logs will be 'deduplicated'. What this means is that, excluding certain special fields like time, multiple +// identical log entries will be grouped up and a summary message emitted. +// For example, instead of: +// 00:00:00 INFO User 123 did xyz +// 00:00:10 INFO User 123 did xyz +// 00:00:25 INFO User 123 did xyz +// 00:00:55 INFO User 123 did xyz +// you would get: +// 00:00:00 INFO User 123 did xyz +// 00:01:00 INFO Repeated 3 times: User 123 did xyz +// The interval argument controls how long to wait for additional messages to arrive before reporting. +// Increase it to deduplicate more aggressively, decrease it to lower latency from a log occurring to it appearing. +// Set it to 0 to pick a sensible default value (recommended). +// NOTE: For simplicity and efficiency, fields are considered 'equal' if and only if their string representations (%v) are equal. +func SetupDeduplication(logLevel string, interval time.Duration) error { + dedupeLevel, err := log.ParseLevel(logLevel) + if err != nil { + return fmt.Errorf("Error parsing log level: %v", err) + } + if interval <= 0 { + interval = defaultDedupeInterval + } + + // We use a special Formatter to either format the log using the original formatter, or to return "" + // so nothing will be written for that event. The repeated entries are later logged along with a field flag + // that tells the formatter to ignore the message. + stdLogger := log.StandardLogger() + stdLogger.Formatter = newDedupeFormatter(stdLogger.Formatter, dedupeLevel, interval) + return nil +} + type textFormatter struct{} // Based off logrus.TextFormatter, which behaves completely @@ -35,9 +74,7 @@ func (f *textFormatter) Format(entry *log.Entry) ([]byte, error) { timeStamp := entry.Time.Format("2006/01/02 15:04:05.000000") if len(entry.Data) > 0 { fmt.Fprintf(b, "%s: %s %-44s ", levelText, timeStamp, entry.Message) - for k, v := range entry.Data { - fmt.Fprintf(b, " %s=%v", k, v) - } + b.WriteString(fieldsToString(entry.Data)) } else { // No padding when there's no fields fmt.Fprintf(b, "%s: %s %s", levelText, timeStamp, entry.Message) @@ -55,3 +92,93 @@ func (f *textFormatter) Format(entry *log.Entry) ([]byte, error) { func With(ctx context.Context) *log.Entry { return log.WithFields(user.LogFields(ctx)) } + +type entryCount struct { + entry log.Entry + count int +} + +type dedupeFormatter struct { + innerFormatter log.Formatter + level log.Level + interval time.Duration + seen map[string]entryCount + lock sync.Mutex +} + +func newDedupeFormatter(innerFormatter log.Formatter, level log.Level, interval time.Duration) *dedupeFormatter { + return &dedupeFormatter{ + innerFormatter: innerFormatter, + level: level, + interval: interval, + seen: map[string]entryCount{}, + } +} + +func (f *dedupeFormatter) Format(entry *log.Entry) ([]byte, error) { + if f.shouldLog(entry) { + b, err := f.innerFormatter.Format(entry) + return b, err + } + return []byte{}, nil +} + +func (f *dedupeFormatter) shouldLog(entry *log.Entry) bool { + if _, ok := entry.Data["deduplicated"]; ok { + // ignore our own logs about deduped messages + return true + } + if entry.Level < f.level { + // ignore logs more severe than our level + return true + } + key := fmt.Sprintf("%s %s", entry.Message, fieldsToString(entry.Data)) + f.lock.Lock() + defer f.lock.Unlock() + if ec, ok := f.seen[key]; ok { + // already seen, increment count and do not log + ec.count++ + f.seen[key] = ec + return false + } + // New message, log it but add it to seen. + // We need to copy because the pointer ceases to be valid after we return from Format + f.seen[key] = entryCount{entry: *entry} + go f.evictEntry(key) // queue to evict later + return true +} + +// Wait for interval seconds then evict the entry and send the log +func (f *dedupeFormatter) evictEntry(key string) { + time.Sleep(f.interval) + var ec entryCount + func() { + f.lock.Lock() + defer f.lock.Unlock() + ec = f.seen[key] + delete(f.seen, key) + }() + if ec.count == 0 { + return + } + entry := log.WithFields(ec.entry.Data).WithField("deduplicated", ec.count) + message := fmt.Sprintf("Repeated %d times: %s", ec.count, ec.entry.Message) + // There's no way to choose the log level dynamically, so we have to do this hack + map[log.Level]func(args ...interface{}){ + log.PanicLevel: entry.Panic, + log.FatalLevel: entry.Fatal, + log.ErrorLevel: entry.Error, + log.WarnLevel: entry.Warn, + log.InfoLevel: entry.Info, + log.DebugLevel: entry.Debug, + }[ec.entry.Level](message) +} + +func fieldsToString(data log.Fields) string { + parts := make([]string, 0, len(data)) + // traversal order here is arbitrary but stable, which is fine for our purposes + for k, v := range data { + parts = append(parts, fmt.Sprintf("%s=%v", k, v)) + } + return strings.Join(parts, " ") +} diff --git a/vendor/github.com/weaveworks/common/middleware/grpc_logging.go b/vendor/github.com/weaveworks/common/middleware/grpc_logging.go index 9a781216e3..8788dff569 100644 --- a/vendor/github.com/weaveworks/common/middleware/grpc_logging.go +++ b/vendor/github.com/weaveworks/common/middleware/grpc_logging.go @@ -6,6 +6,8 @@ import ( log "github.com/Sirupsen/logrus" "golang.org/x/net/context" "google.golang.org/grpc" + + "github.com/weaveworks/common/logging" ) const gRPC = "gRPC" @@ -14,10 +16,11 @@ const gRPC = "gRPC" var ServerLoggingInterceptor = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { begin := time.Now() resp, err := handler(ctx, req) + entry := logging.With(ctx).WithFields(log.Fields{"method": info.FullMethod, "duration": time.Since(begin)}) if err != nil { - log.Warnf("%s %s (%v) %s", gRPC, info.FullMethod, err, time.Since(begin)) + entry.WithError(err).Warn(gRPC) } else { - log.Debugf("%s %s (success) %s", gRPC, info.FullMethod, time.Since(begin)) + entry.Debugf("%s (success)", gRPC) } return resp, err } diff --git a/vendor/github.com/weaveworks/common/server/server.go b/vendor/github.com/weaveworks/common/server/server.go index 6de17b4585..c5b2955399 100644 --- a/vendor/github.com/weaveworks/common/server/server.go +++ b/vendor/github.com/weaveworks/common/server/server.go @@ -40,6 +40,8 @@ type Config struct { HTTPListenPort int GRPCListenPort int + RegisterInstrumentation bool + ServerGracefulShutdownTimeout time.Duration HTTPServerReadTimeout time.Duration HTTPServerWriteTimeout time.Duration @@ -53,6 +55,7 @@ type Config struct { func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.HTTPListenPort, "server.http-listen-port", 80, "HTTP server listen port.") f.IntVar(&cfg.GRPCListenPort, "server.grpc-listen-port", 9095, "gRPC server listen port.") + f.BoolVar(&cfg.RegisterInstrumentation, "server.register-instrumentation", true, "Register the intrumentation handlers (/metrics etc).") f.DurationVar(&cfg.ServerGracefulShutdownTimeout, "server.graceful-shutdown-timeout", 5*time.Second, "Timeout for graceful shutdowns") f.DurationVar(&cfg.HTTPServerReadTimeout, "server.http-read-timeout", 5*time.Second, "Read timeout for HTTP server") f.DurationVar(&cfg.HTTPServerWriteTimeout, "server.http-write-timeout", 5*time.Second, "Write timeout for HTTP server") @@ -111,9 +114,9 @@ func New(cfg Config) (*Server, error) { // Setup HTTP server router := mux.NewRouter() - router.Handle("/metrics", prometheus.Handler()) - router.Handle("/traces", loki.Handler()) - router.PathPrefix("/debug/pprof").Handler(http.DefaultServeMux) + if cfg.RegisterInstrumentation { + RegisterInstrumentation(router) + } httpMiddleware := []middleware.Interface{ middleware.Log{}, middleware.Instrument{ @@ -144,6 +147,13 @@ func New(cfg Config) (*Server, error) { }, nil } +// RegisterInstrumentation on the given router. +func RegisterInstrumentation(router *mux.Router) { + router.Handle("/metrics", prometheus.Handler()) + router.Handle("/traces", loki.Handler()) + router.PathPrefix("/debug/pprof").Handler(http.DefaultServeMux) +} + // Run the server; blocks until SIGTERM is received. func (s *Server) Run() { go s.httpServer.Serve(s.httpListener) diff --git a/vendor/github.com/weaveworks/common/user/id.go b/vendor/github.com/weaveworks/common/user/id.go index ef881132db..a1f91f0c1f 100644 --- a/vendor/github.com/weaveworks/common/user/id.go +++ b/vendor/github.com/weaveworks/common/user/id.go @@ -35,8 +35,8 @@ func ExtractOrgID(ctx context.Context) (string, error) { } // InjectOrgID returns a derived context containing the org ID. -func InjectOrgID(ctx context.Context, userID string) context.Context { - return context.WithValue(ctx, interface{}(orgIDContextKey), userID) +func InjectOrgID(ctx context.Context, orgID string) context.Context { + return context.WithValue(ctx, interface{}(orgIDContextKey), orgID) } // ExtractUserID gets the user ID from the context. diff --git a/vendor/github.com/weaveworks/common/user/logging.go b/vendor/github.com/weaveworks/common/user/logging.go index 1180f1fdfb..aaa29e32a5 100644 --- a/vendor/github.com/weaveworks/common/user/logging.go +++ b/vendor/github.com/weaveworks/common/user/logging.go @@ -10,11 +10,11 @@ import ( func LogFields(ctx context.Context) log.Fields { fields := log.Fields{} userID, err := ExtractUserID(ctx) - if err != nil { + if err == nil { fields["userID"] = userID } orgID, err := ExtractOrgID(ctx) - if err != nil { + if err == nil { fields["orgID"] = orgID } return fields diff --git a/vendor/manifest b/vendor/manifest index a595c8e7e7..64588b4fc9 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -980,7 +980,7 @@ "importpath": "github.com/weaveworks/common", "repository": "https://github.com/weaveworks/common", "vcs": "git", - "revision": "493a1f760f47ed3b50afd5baabb36589d96017b8", + "revision": "b811bc96d43d51edbae6693e7d1b0a367114595b", "branch": "master", "notests": true },