Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 87 additions & 93 deletions src/gprofiler_flamedb_rest/db/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,15 +347,15 @@ func (c *ClickHouseClient) GetTopFrames(ctx context.Context, params common.Flame
tableNew := getTableName(table, tablePrefix)
go func(sTable string, sStart string, sEnd string) {
defer wg.Done()
query := fmt.Sprintf(`
query := `
SELECT CallStackHash, any(CallStackName), any(CallStackParent), sum(NumSamples)
AS SumNumSamples FROM %s
WHERE ServiceId == '%d' AND (Timestamp BETWEEN '%s' AND '%s') %s
AS SumNumSamples FROM ?
WHERE ServiceId == '?' AND (Timestamp BETWEEN '?' AND '?') ?
GROUP BY CallStackHash
ORDER BY SumNumSamples DESC
LIMIT %d`, sTable, params.ServiceId, sStart, sEnd, conditions, params.StacksNum)
log.Printf("SELECT query: %s", query)
rows, err := c.client.Query(query)
LIMIT ?`
log.Printf("SELECT query: ?", query)
rows, err := c.client.Query(query, sTable, params.ServiceId, sStart, sEnd, conditions, params.StacksNum)
if err == nil {
defer rows.Close()
frames := make(map[uint64]Frame)
Expand Down Expand Up @@ -401,13 +401,10 @@ func (c *ClickHouseClient) FetchInstanceTypeCount(ctx context.Context, params co
_, conditions := BuildConditions(params.ContainerName, params.HostName, params.InstanceType, params.K8SObject, filterQuery)
selectQuery = `
SELECT InstanceType, COUNT(DISTINCT HostName) as InstanceCount
FROM flamedb.samples_1min where ServiceId = '%d' AND (Timestamp BETWEEN '%s' AND '%s' )
%s GROUP BY InstanceType ORDER BY InstanceCount DESC`

query := fmt.Sprintf(selectQuery, params.ServiceId, common.FormatTime(params.StartDateTime),
FROM flamedb.samples_1min where ServiceId = '?' AND (Timestamp BETWEEN '?' AND '?' )
? GROUP BY InstanceType ORDER BY InstanceCount DESC`
rows, err := c.client.Query(selectQuery, params.ServiceId, common.FormatTime(params.StartDateTime),
common.FormatTime(params.EndDateTime), conditions)

rows, err := c.client.Query(query)
if err == nil {
defer rows.Close()
for rows.Next() {
Expand All @@ -433,12 +430,10 @@ func (c *ClickHouseClient) FetchFieldValueSample(ctx context.Context, field stri
result := make([]common.FilterData, 0)
_, conditions := BuildConditions(params.ContainerName, params.HostName, params.InstanceType, params.K8SObject, filterQuery)
selectQuery = `
SELECT %s, SUM(NumSamples) as samples from flamedb.samples_1min WHERE ServiceId == '%d' AND
(Timestamp BETWEEN '%s' AND '%s') %s GROUP BY %s ORDER BY samples DESC;`
query := fmt.Sprintf(selectQuery, field, params.ServiceId, common.FormatTime(params.StartDateTime),
SELECT ?, SUM(NumSamples) as samples from flamedb.samples_1min WHERE ServiceId == '?' AND
(Timestamp BETWEEN '?' AND '?') ? GROUP BY ? ORDER BY samples DESC;`
rows, err := c.client.Query(selectQuery,field, params.ServiceId, common.FormatTime(params.StartDateTime),
common.FormatTime(params.EndDateTime), conditions, field)

rows, err := c.client.Query(query)
if err == nil {
defer rows.Close()
for rows.Next() {
Expand All @@ -463,13 +458,13 @@ func (c *ClickHouseClient) FetchFieldValues(ctx context.Context, field string, p
filterQuery string) []common.FilterData {
result := make([]common.FilterData, 0)
_, conditions := BuildConditions(params.ContainerName, params.HostName, params.InstanceType, params.K8SObject, filterQuery)
query := fmt.Sprintf(`
SELECT %s from flamedb.samples_1min WHERE ServiceId == '%d' AND
(Timestamp BETWEEN '%s' AND '%s') %s GROUP BY %s;
`, field, params.ServiceId, common.FormatTime(params.StartDateTime),
common.FormatTime(params.EndDateTime), conditions, field)
query := `
SELECT ? from flamedb.samples_1min WHERE ServiceId == '?' AND
(Timestamp BETWEEN '?' AND '?') ? GROUP BY ?;
`

rows, err := c.client.Query(query)
rows, err := c.client.Query(query, field, params.ServiceId, common.FormatTime(params.StartDateTime),
common.FormatTime(params.EndDateTime), conditions, field)
if err == nil {
defer rows.Close()
for rows.Next() {
Expand All @@ -494,15 +489,15 @@ func (c *ClickHouseClient) FetchSampleCount(ctx context.Context, params common.Q
_, conditions := BuildConditions(params.ContainerName, params.HostName, params.InstanceType, params.K8SObject, filterQuery)
interval := getInterval(params.StartDateTime, params.EndDateTime, params.Interval)
result := make([]common.Sample, 0)
query := fmt.Sprintf(`
SELECT toStartOfInterval(Timestamp, INTERVAL '%s') as Datetime, SUM(NumSamples)
query := `
SELECT toStartOfInterval(Timestamp, INTERVAL '?') as Datetime, SUM(NumSamples)
FROM flamedb.samples_1min
WHERE ServiceId == '%d' AND (Timestamp BETWEEN '%s' AND '%s') %s
WHERE ServiceId == '?' AND (Timestamp BETWEEN '?' AND '?') ?
GROUP BY Datetime
ORDER BY Datetime DESC;
`, interval, params.ServiceId, common.FormatTime(params.StartDateTime),
`
rows, err := c.client.Query(query, interval, params.ServiceId, common.FormatTime(params.StartDateTime),
common.FormatTime(params.EndDateTime), conditions)
rows, err := c.client.Query(query)
if err == nil {
defer rows.Close()
for rows.Next() {
Expand Down Expand Up @@ -530,29 +525,28 @@ func (c *ClickHouseClient) FetchSampleCountByFunction(ctx context.Context, param
interval = "1 minute"
}
result := make([]common.SamplesCountByFunction, 0)
query := fmt.Sprintf(`
query := `
WITH all_samples as(
SELECT toStartOfInterval(Timestamp, INTERVAL '%s') AS Datetime, SUM(NumSamples) AS sum_cpu
SELECT toStartOfInterval(Timestamp, INTERVAL '?') AS Datetime, SUM(NumSamples) AS sum_cpu
FROM flamedb.samples_1min
WHERE ServiceId == '%d' AND (Timestamp BETWEEN '%s' AND '%s') %s
WHERE ServiceId == '?' AND (Timestamp BETWEEN '?' AND '?') ?
GROUP BY Datetime
ORDER BY Datetime DESC
), function_samples AS (
SELECT toStartOfInterval(Timestamp, INTERVAL '%s') AS Datetime, SUM(NumSamples) AS sum_cpu
SELECT toStartOfInterval(Timestamp, INTERVAL '?') AS Datetime, SUM(NumSamples) AS sum_cpu
FROM flamedb.samples
WHERE ServiceId == '%d' AND (Timestamp BETWEEN '%s' AND '%s') AND (CallStackName = '%s') %s
WHERE ServiceId == '?' AND (Timestamp BETWEEN '?' AND '?') AND (CallStackName = '?') ?
GROUP BY Datetime
ORDER BY Datetime DESC
)

SELECT (function_samples.sum_cpu/all_samples.sum_cpu) AS Samples , all_samples.Datetime AS Datetime
FROM all_samples
LEFT JOIN function_samples ON function_samples.Datetime = all_samples.Datetime;
`, interval, params.ServiceId, common.FormatTime(params.StartDateTime),
`
rows, err := c.client.Query(query, interval, params.ServiceId, common.FormatTime(params.StartDateTime),
common.FormatTime(params.EndDateTime), conditions, interval, params.ServiceId, common.FormatTime(params.StartDateTime),
common.FormatTime(params.EndDateTime), params.FunctionName, conditions)

rows, err := c.client.Query(query)
if err == nil {
defer rows.Close()

Expand Down Expand Up @@ -601,14 +595,14 @@ func (c *ClickHouseClient) FetchTimes(ctx context.Context, params common.QueryPa
default:
interval = getInterval(params.StartDateTime, params.EndDateTime, "")
}
query := fmt.Sprintf(`
SELECT toStartOfInterval(Timestamp, INTERVAL '%s') as Datetime
from flamedb.samples_1min WHERE ServiceId == '%d' AND
(Timestamp BETWEEN '%s' AND '%s') %s
query := `
SELECT toStartOfInterval(Timestamp, INTERVAL '?') as Datetime
from flamedb.samples_1min WHERE ServiceId == '?' AND
(Timestamp BETWEEN '?' AND '?') ?
GROUP BY Datetime
ORDER BY Datetime DESC;`, interval, params.ServiceId,
ORDER BY Datetime DESC;`
rows, err := c.client.Query(query, interval, params.ServiceId,
common.FormatTime(params.StartDateTime), common.FormatTime(params.EndDateTime), conditions)
rows, err := c.client.Query(query)
if err == nil {
defer rows.Close()
for rows.Next() {
Expand All @@ -631,13 +625,13 @@ func (c *ClickHouseClient) FetchTimeRange(ctx context.Context, params common.Que
result := make([]string, 0)
_, conditions := BuildConditions(params.ContainerName, params.HostName, params.InstanceType, params.K8SObject, filterQuery)

query := fmt.Sprintf(`
query := `
SELECT min(Timestamp), max(Timestamp)
from flamedb.samples_1min WHERE
ServiceId == '%d' AND
(Timestamp BETWEEN '%s' AND '%s') %s;`, params.ServiceId,
ServiceId == '?' AND
(Timestamp BETWEEN '?' AND '?') ?;`
rows, err := c.client.Query(query, params.ServiceId,
common.FormatTime(params.StartDateTime), common.FormatTime(params.EndDateTime), conditions)
rows, err := c.client.Query(query)
if err == nil {
defer rows.Close()
for rows.Next() {
Expand Down Expand Up @@ -667,18 +661,18 @@ func (c *ClickHouseClient) FetchMetricsSummary(ctx context.Context, params commo
_, conditions := BuildConditions(defaultEmptyList, params.HostName, params.InstanceType, defaultEmptyList, filterQuery)

percentile := float64(params.Percentile) / 100.0
query := fmt.Sprintf(`
query := `
SELECT arrayAvg(flatten(groupArray(CPUArray))), MAX(MaxCPU),
AVG(MaxMemory), MAX(MaxMemory), quantile(%f)(MaxMemory), count() FROM
AVG(MaxMemory), MAX(MaxMemory), quantile(?)(MaxMemory), count() FROM
(SELECT
MAX(MemoryAverageUsedPercent) AS MaxMemory,
MAX(CPUAverageUsedPercent) as MaxCPU,
groupArray(CPUAverageUsedPercent) as CPUArray
FROM %s
WHERE ServiceId = %d AND (Timestamp BETWEEN '%s' AND '%s') %s
GROUP BY HostName)`, percentile, config.ClickHouseMetricsTable, params.ServiceId,
FROM ?
WHERE ServiceId = ? AND (Timestamp BETWEEN '?' AND '?') ?
GROUP BY HostName)`
rows, err := c.client.Query(query, percentile, config.ClickHouseMetricsTable, params.ServiceId,
common.FormatTime(params.StartDateTime), common.FormatTime(params.EndDateTime), conditions)
rows, err := c.client.Query(query)
if err == nil {
defer func(rows *sql.Rows) {
err := rows.Close()
Expand Down Expand Up @@ -722,31 +716,31 @@ func (c *ClickHouseClient) FetchMetricsServicesListSummary(ctx context.Context,
formattedServicesList := joinIntSlice(params.ServicesList, ",")
percentile := float64(params.Percentile) / 100.0

query := fmt.Sprintf(`
query := `
WITH LatestServices AS (
SELECT
ServiceId as s_id, max(Timestamp) as last_seen
FROM %s
WHERE ServiceId in (%s) AND (Timestamp BETWEEN '%s' AND '%s')
FROM ?
WHERE ServiceId in (?) AND (Timestamp BETWEEN '?' AND '?')
GROUP BY ServiceId
), GroupedMetrics AS (
SELECT
ServiceId,
max(MemoryAverageUsedPercent) AS MaxMemory,
max(CPUAverageUsedPercent) as MaxCPU,
groupArray(CPUAverageUsedPercent) as CPUArray
FROM %s
FROM ?
GLOBAL JOIN LatestServices ON ServiceId = s_id
WHERE ServiceId in (%s) AND (Timestamp BETWEEN last_seen - toIntervalHour(24) AND last_seen)
WHERE ServiceId in (?) AND (Timestamp BETWEEN last_seen - toIntervalHour(24) AND last_seen)
GROUP BY HostName, ServiceId
)
SELECT arrayAvg(flatten(groupArray(CPUArray))), max(MaxCPU), ServiceId,
avg(MaxMemory), max(MaxMemory), quantile(%f)(MaxMemory), count()
avg(MaxMemory), max(MaxMemory), quantile(?)(MaxMemory), count()
FROM GroupedMetrics
GROUP BY ServiceId`, config.ClickHouseMetricsTable, formattedServicesList,
GROUP BY ServiceId`
rows, err := c.client.Query(query, config.ClickHouseMetricsTable, formattedServicesList,
common.FormatTime(params.StartDateTime), common.FormatTime(params.EndDateTime),
config.ClickHouseMetricsTable, formattedServicesList, percentile)
rows, err := c.client.Query(query)

var results []common.MetricsServicesListSummary

Expand Down Expand Up @@ -806,21 +800,21 @@ func (c *ClickHouseClient) FetchMetricsGraph(ctx context.Context, params common.
if params.GroupBy != "none" {
groupBy = fmt.Sprintf(", %s", params.GroupBy)
}
query := fmt.Sprintf(`
SELECT Datetime %s, arrayAvg(flatten(groupArray(CPUArray))), MAX(MaxCPU),
AVG(MaxMemory), MAX(MaxMemory), quantile(%f)(MaxMemory) FROM
(SELECT toStartOfInterval(Timestamp, INTERVAL '%s') as
Datetime %s,
query := `
SELECT Datetime ?, arrayAvg(flatten(groupArray(CPUArray))), MAX(MaxCPU),
AVG(MaxMemory), MAX(MaxMemory), quantile(?)(MaxMemory) FROM
(SELECT toStartOfInterval(Timestamp, INTERVAL '?') as
Datetime ?,
HostName,
MAX(MemoryAverageUsedPercent) AS MaxMemory,
MAX(CPUAverageUsedPercent) as MaxCPU,
groupArray(CPUAverageUsedPercent) as CPUArray
FROM %s
WHERE ServiceId = %d AND (Datetime BETWEEN '%s' AND '%s') %s
GROUP BY Datetime %s, HostName) GROUP BY Datetime %s ORDER BY Datetime DESC;
`, groupBy, percentile, interval, groupBy, config.ClickHouseMetricsTable, params.ServiceId,
FROM ?
WHERE ServiceId = ? AND (Datetime BETWEEN '?' AND '?') ?
GROUP BY Datetime ?, HostName) GROUP BY Datetime ? ORDER BY Datetime DESC;
`
rows, err := c.client.Query(query, groupBy, percentile, interval, groupBy, config.ClickHouseMetricsTable, params.ServiceId,
common.FormatTime(params.StartDateTime), common.FormatTime(params.EndDateTime), conditions, groupBy, groupBy)
rows, err := c.client.Query(query)
if err == nil {
defer func(rows *sql.Rows) {
err := rows.Close()
Expand Down Expand Up @@ -869,7 +863,7 @@ func (c *ClickHouseClient) FetchMetricsCpuTrend(ctx context.Context, params comm
finalResult := common.MetricsCpuTrend{}
_, conditions := BuildConditions(defaultEmptyList, params.HostName, params.InstanceType, defaultEmptyList, filterQuery)

query := fmt.Sprintf(`
query := `
WITH CURRENT_CONSUMPTION AS (
SELECT
arrayAvg(flatten(groupArray(CPUArray))) AS avg_cpu,
Expand All @@ -882,8 +876,8 @@ func (c *ClickHouseClient) FetchMetricsCpuTrend(ctx context.Context, params comm
MAX(MemoryAverageUsedPercent) AS MaxMemory,
MAX(CPUAverageUsedPercent) as MaxCPU,
groupArray(CPUAverageUsedPercent) as CPUArray
FROM %s
WHERE ServiceId = %d AND (Timestamp BETWEEN '%s' AND '%s') %s
FROM ?
WHERE ServiceId = ? AND (Timestamp BETWEEN '?' AND '?') ?
GROUP BY HostName)
),
PREVIOUS_CONSUMPTION AS (
Expand All @@ -898,22 +892,22 @@ func (c *ClickHouseClient) FetchMetricsCpuTrend(ctx context.Context, params comm
MAX(MemoryAverageUsedPercent) AS MaxMemory,
MAX(CPUAverageUsedPercent) as MaxCPU,
groupArray(CPUAverageUsedPercent) as CPUArray
FROM %s
WHERE ServiceId = %d AND (Timestamp BETWEEN '%s' AND '%s') %s
FROM ?
WHERE ServiceId = ? AND (Timestamp BETWEEN '?' AND '?') ?
GROUP BY HostName)
)
SELECT avg_cpu, max_cpu, avg_memory, max_memory
FROM (
SELECT * FROM CURRENT_CONSUMPTION
UNION ALL
SELECT * FROM PREVIOUS_CONSUMPTION)
order by SortOrder`, config.ClickHouseMetricsTable, params.ServiceId,
order by SortOrder`

first := true
rows, err := c.client.Query(query, config.ClickHouseMetricsTable, params.ServiceId,
common.FormatTime(params.StartDateTime), common.FormatTime(params.EndDateTime), conditions,
config.ClickHouseMetricsTable, params.ServiceId,
common.FormatTime(params.ComparedStartDateTime), common.FormatTime(params.ComparedEndDateTime), conditions)

first := true
rows, err := c.client.Query(query)
if err == nil {
defer func(rows *sql.Rows) {
err := rows.Close()
Expand Down Expand Up @@ -966,10 +960,10 @@ func (c *ClickHouseClient) FetchServices(ctx context.Context, params common.Serv
expr = "(ServiceId,ContainerEnvName)"
groupByExpr = "GROUP BY (ServiceId,ContainerEnvName)"
}
query := fmt.Sprintf(`
SELECT %s from flamedb.samples_1min WHERE (Timestamp BETWEEN '%s' AND '%s') %s;
`, expr, common.FormatTime(params.StartDateTime), common.FormatTime(params.EndDateTime), groupByExpr)
rows, err := c.client.Query(query)
query := `
SELECT ? from flamedb.samples_1min WHERE (Timestamp BETWEEN '?' AND '?') ?;
`
rows, err := c.client.Query(query, expr, common.FormatTime(params.StartDateTime), common.FormatTime(params.EndDateTime), groupByExpr)
result := make([]SrvResp, 0)
if err == nil {
defer rows.Close()
Expand Down Expand Up @@ -999,12 +993,12 @@ func (c *ClickHouseClient) FetchSessionsCount(ctx context.Context, params common
filterQuery string) (int, error) {
_, conditions := BuildConditions(params.ContainerName, params.HostName, params.InstanceType, params.K8SObject, filterQuery)

query := fmt.Sprintf(`
SELECT uniq(HostName,Timestamp) FROM flamedb.samples_1min WHERE ServiceId = %d AND
(Timestamp BETWEEN '%s' AND '%s') %s;
`, params.ServiceId, common.FormatTime(params.StartDateTime),
query := `
SELECT uniq(HostName,Timestamp) FROM flamedb.samples_1min WHERE ServiceId = ? AND
(Timestamp BETWEEN '?' AND '?') ?;
`
rows, err := c.client.Query(query, params.ServiceId, common.FormatTime(params.StartDateTime),
common.FormatTime(params.EndDateTime), conditions)
rows, err := c.client.Query(query)
if err == nil {
defer rows.Close()
for rows.Next() {
Expand All @@ -1025,11 +1019,11 @@ func (c *ClickHouseClient) FetchSessionsCount(ctx context.Context, params common
func (c *ClickHouseClient) FetchLastHTML(ctx context.Context, params common.MetricsLastHTMLParams,
filterQuery string) (string, error) {
_, conditions := BuildConditions(params.ContainerName, params.HostName, params.InstanceType, params.K8SObject, filterQuery)
query := fmt.Sprintf(`
SELECT argMax(HTMLPath,Timestamp) FROM flamedb.metrics WHERE ServiceId = %d AND
(Timestamp BETWEEN '%s' AND '%s') %s;
`, params.ServiceId, common.FormatTime(params.StartDateTime), common.FormatTime(params.EndDateTime), conditions)
rows, err := c.client.Query(query)
query := `
SELECT argMax(HTMLPath,Timestamp) FROM flamedb.metrics WHERE ServiceId = ? AND
(Timestamp BETWEEN '?' AND '?') ?;
`
rows, err := c.client.Query(query, params.ServiceId, common.FormatTime(params.StartDateTime), common.FormatTime(params.EndDateTime), conditions)
if err == nil {
defer rows.Close()
for rows.Next() {
Expand Down