Skip to content

Commit 50745d3

Browse files
committed
first version: e2e log sync tool
1 parent 5c65f66 commit 50745d3

File tree

4 files changed

+269
-1
lines changed

4 files changed

+269
-1
lines changed

Tiltfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ def deploy_observability():
330330

331331
if "loki" in settings.get("deploy_observability", []):
332332
k8s_yaml(read_file("./.tiltbuild/yaml/loki.observability.yaml"), allow_duplicates = True)
333-
k8s_resource(workload = "loki", extra_pod_selectors = [{"app": "loki"}], labels = ["observability"])
333+
k8s_resource(workload = "loki", port_forwards = "3100:3100", extra_pod_selectors = [{"app": "loki"}], labels = ["observability"])
334334

335335
if "grafana" in settings.get("deploy_observability", []):
336336
k8s_yaml(read_file("./.tiltbuild/yaml/grafana.observability.yaml"), allow_duplicates = True)

hack/observability/grafana/values.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Configuration for grafana chart, see https://github.com/grafana/helm-charts/tree/main/charts/grafana
22

3+
# Set a password explicitly to avoid infinite tilt reloads because
4+
# of a random password.
5+
adminPassword: admin
6+
37
grafana.ini:
48
# Disable the grafana login form.
59
auth:

hack/tools/e2e-log-sync/README.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
2+
# E2e log sync
3+
4+
## Prerequisites
5+
6+
Start the Tilt development environment via `tilt up`.
7+
8+
*Notes*:
9+
* If you only want to see imported logs, disable promtail.
10+
* If you want to drop all logs from Loki, just delete the Loki Pod in the `observability` namespace.
11+
12+
## Import logs
13+
14+
Imports logs into Loki:
15+
```bash
16+
go run ./hack/tools/e2e-log-sync --bucket=kubernetes-jenkins --controller-folder=pr-logs/pull/kubernetes-sigs_cluster-api/6150/pull-cluster-api-e2e-main/1496099075710259200/artifacts/clusters/bootstrap/controllers
17+
```
18+
19+
## View logs
20+
21+
Now the logs are available:
22+
* via Grafana: `http://localhost:3001/explore`
23+
* via the Loki `logcli`:
24+
```bash
25+
logcli query '{app="capd-controller-manager"}' --timezone=UTC --from="2022-02-22T10:00:00Z"
26+
```
27+
28+
## Caveats
29+
30+
* Make sure you query the correct time range.
31+
* Right now it takes ~5m until the logs can be queried in Loki
32+
* Maybe it's https://community.grafana.com/t/how-to-build-a-loki-cluster-and-query-logs-instantly/45995/5
33+
* TODO check tail --delay-for

hack/tools/e2e-log-sync/main.go

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
//go:build tools
2+
// +build tools
3+
4+
/*
5+
Copyright 2022 The Kubernetes Authors.
6+
7+
Licensed under the Apache License, Version 2.0 (the "License");
8+
you may not use this file except in compliance with the License.
9+
You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing, software
14+
distributed under the License is distributed on an "AS IS" BASIS,
15+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
See the License for the specific language governing permissions and
17+
limitations under the License.
18+
*/
19+
20+
package main
21+
22+
import (
23+
"bytes"
24+
"context"
25+
"encoding/json"
26+
"flag"
27+
"fmt"
28+
"io/ioutil"
29+
"net/http"
30+
"os"
31+
"strconv"
32+
"strings"
33+
"time"
34+
35+
"cloud.google.com/go/storage"
36+
"github.com/valyala/fastjson"
37+
"google.golang.org/api/iterator"
38+
"google.golang.org/api/option"
39+
"k8s.io/klog/v2"
40+
)
41+
42+
var (
43+
// FIXME
44+
bucket = flag.String("bucket", "kubernetes-jenkins", "Bucket to download the logs from.")
45+
controllerFolder = flag.String("controller-folder", "pr-logs/pull/kubernetes-sigs_cluster-api/6150/pull-cluster-api-e2e-main/1496099075710259200/artifacts/clusters/bootstrap/controllers", "Folder to get the controller-logs from.")
46+
lokiURL = flag.String("loki-url", "http://localhost:3100/loki/api/v1/push", "URL to push the logs to.")
47+
)
48+
49+
func main() {
50+
flag.Parse()
51+
52+
if err := run(); err != nil {
53+
fmt.Printf("Error occured: %v\n", err)
54+
os.Exit(1)
55+
}
56+
}
57+
58+
type logData struct {
59+
logFile string
60+
logMetadata string
61+
}
62+
63+
func run() error {
64+
ctx := context.Background()
65+
client, err := storage.NewClient(ctx, option.WithoutAuthentication())
66+
if err != nil {
67+
return fmt.Errorf("storage.NewClient: %v", err)
68+
}
69+
defer client.Close()
70+
71+
logs, err := getLogData(ctx, client, *bucket, *controllerFolder)
72+
73+
for _, ld := range logs {
74+
logFile, err := downloadFile(ctx, client, *bucket, ld.logFile)
75+
if err != nil {
76+
return err
77+
}
78+
logMetadata, err := downloadFile(ctx, client, *bucket, ld.logMetadata)
79+
if err != nil {
80+
return err
81+
}
82+
83+
metadata := map[string]string{}
84+
if err := json.Unmarshal(logMetadata, &metadata); err != nil {
85+
return err
86+
}
87+
88+
klog.Infof("Uploading logs from: %s", ld.logFile)
89+
// FIXME: batch log lines. Just pushing individual lines for now for debugging.
90+
for _, logLine := range strings.Split(string(logFile), "\n") {
91+
streams := Streams{}
92+
93+
lineMetadata := map[string]string{}
94+
for k, v := range metadata {
95+
lineMetadata[k] = v
96+
}
97+
98+
s := Stream{
99+
Stream: lineMetadata,
100+
}
101+
102+
parsedLogLine, err := fastjson.Parse(logLine)
103+
if err != nil {
104+
continue
105+
}
106+
if !parsedLogLine.Exists("ts") {
107+
continue
108+
}
109+
110+
tsMilli, err := parsedLogLine.Get("ts").Float64()
111+
if err != nil {
112+
return err
113+
}
114+
tsNano := tsMilli * 1000 * 1000
115+
116+
if parsedLogLine.Exists("cluster") {
117+
cluster := parsedLogLine.Get("cluster").String()
118+
cluster, err = strconv.Unquote(cluster)
119+
if err != nil {
120+
return err
121+
}
122+
s.Stream["cluster"] = cluster
123+
}
124+
if parsedLogLine.Exists("machine") {
125+
machine := parsedLogLine.Get("machine").String()
126+
machine, err = strconv.Unquote(machine)
127+
if err != nil {
128+
return err
129+
}
130+
s.Stream["machine"] = machine
131+
}
132+
133+
s.Values = append(s.Values, []string{
134+
strconv.Itoa(int(tsNano)),
135+
logLine,
136+
})
137+
138+
streams.Streams = append(streams.Streams, s)
139+
140+
body, err := json.Marshal(streams)
141+
if err != nil {
142+
return err
143+
}
144+
145+
// See: https://grafana.com/docs/loki/latest/api/#post-lokiapiv1push
146+
req, err := http.NewRequest(http.MethodPost, *lokiURL, bytes.NewBuffer(body))
147+
if err != nil {
148+
return err
149+
}
150+
req.Header.Set("Content-Type", "application/json")
151+
152+
resp, err := http.DefaultClient.Do(req)
153+
if err != nil {
154+
return err
155+
}
156+
157+
fmt.Println(resp.StatusCode)
158+
respBody, err := ioutil.ReadAll(resp.Body)
159+
if err != nil {
160+
return err
161+
}
162+
// move this into separate func
163+
if err := resp.Body.Close(); err != nil {
164+
return err
165+
}
166+
fmt.Println(string(respBody))
167+
}
168+
}
169+
170+
return nil
171+
}
172+
173+
type Streams struct {
174+
Streams []Stream `json:"streams"`
175+
}
176+
177+
type Stream struct {
178+
Stream map[string]string `json:"stream"`
179+
Values [][]string `json:"values"`
180+
}
181+
182+
func getLogData(ctx context.Context, client *storage.Client, bucket, controllerFolder string) (map[string]logData, error) {
183+
data := map[string]logData{}
184+
query := &storage.Query{
185+
Prefix: controllerFolder,
186+
Delimiter: "",
187+
}
188+
it := client.Bucket(bucket).Objects(ctx, query)
189+
for {
190+
attrs, err := it.Next()
191+
if err != nil {
192+
if err == iterator.Done {
193+
break
194+
}
195+
return nil, err
196+
}
197+
if !strings.HasSuffix(attrs.Name, "manager.log") && !strings.HasSuffix(attrs.Name, "manager.json") {
198+
continue
199+
}
200+
dir := attrs.Name[:strings.LastIndex(attrs.Name, "/")]
201+
202+
ld, _ := data[dir]
203+
if strings.HasSuffix(attrs.Name, "manager.log") {
204+
ld.logFile = attrs.Name
205+
} else {
206+
ld.logMetadata = attrs.Name
207+
}
208+
data[dir] = ld
209+
fmt.Println(attrs.Prefix + attrs.Name)
210+
}
211+
212+
return data, nil
213+
}
214+
215+
// downloadFile downloads an object.
216+
func downloadFile(ctx context.Context, client *storage.Client, bucket, object string) ([]byte, error) {
217+
ctx, cancel := context.WithTimeout(ctx, time.Minute)
218+
defer cancel()
219+
220+
rc, err := client.Bucket(bucket).Object(object).NewReader(ctx)
221+
if err != nil {
222+
return nil, fmt.Errorf("Object(%q).NewReader: %v", object, err)
223+
}
224+
defer rc.Close()
225+
226+
data, err := ioutil.ReadAll(rc)
227+
if err != nil {
228+
return nil, fmt.Errorf("ioutil.ReadAll: %v", err)
229+
}
230+
return data, nil
231+
}

0 commit comments

Comments
 (0)