Skip to content

Improve throughput through sharding fingerprints #3097

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from

Conversation

storyicon
Copy link
Contributor

@storyicon storyicon commented Aug 26, 2020

What this PR does:
This PR is expected to increase the throughput of ingester(chunk storage) by 10-100 times.

Which issue(s) this PR fixes:
Fixes #3093

According to the issue description, Lock and Append take up 95% of the entire Push request time.
The time consumption of the Lock is affected by the amount of concurrency, but in essence it is affected by the time consumption of the code area it protects.
So the key is to solve the problem that Copy takes too long. In fact, this is a truth that any data structure beginner understands: the cost of random insertion into the array is expensive because it requires moving a large number of elements.
To be honest, I first thought of skiplist, because the scene here is very similar to redis's sorted set. After some comparative tests, the random insertion speed of skiplist is indeed much better than that of array. But when it was actually integrated into cortex and used in my environment, it took up too much memory. So I finally went back to the array.
What I consider is to divide a big ordered slice into multiple ordered slices to reduce the cost of Copy. It sounds a bit simple, but is really effective and suitable

Its general idea is as follows:
It divides math.MaxUint64 (this is the value range of fingerprint) into N shards:

shards[0]: [0, math.MaxUint64 * 1/N)
shards[1]: [math.MaxUint64 * 1/N, math.MaxUint64 * 2/N)
...
shards[n]: [math.MaxUint64 * (N-1)/N, math.MaxUint64]

(the actual shard is not as accurate as the above).

Fingerprints in each shard are kept in order. In this way, we can deal with it as we did with a large array of fingerprints. It brings a small cost (shard), but greatly improves the throughput, and it even helps to reduce memory consumption to some extent (large arrays take up more memory when growing).

Comparison:
image

In my test, each consumer can only consume up to 25000 series/s before the modification, and it can consume up to 1400000 series/s after the modification. You may have doubts about the data, I will provide the testing process below.

@storyicon storyicon changed the title shard fingerprints Improve throughput through shard fingerprints Aug 26, 2020
@storyicon storyicon changed the title Improve throughput through shard fingerprints Improve throughput through sharding fingerprints Aug 26, 2020
@storyicon storyicon changed the title Improve throughput through sharding fingerprints Improve throughput through sharding large fingerprints array Aug 26, 2020
@storyicon storyicon changed the title Improve throughput through sharding large fingerprints array Improve throughput through sharding fingerprints Aug 26, 2020
Signed-off-by: yuanchao <[email protected]>
Signed-off-by: yuanchao <[email protected]>
@storyicon
Copy link
Contributor Author

storyicon commented Aug 26, 2020

When I was testing, I created a structure InvertedIndexV2 for the modified InvertedIndex and made it initialized by NewV2.

In order to avoid other interference, I directly modified the Push method of Distributor:

// deduplicator, map[model.Fingerprint]struct{}{}
var fpMap sync.Map
var invertedIndex = index.New() // When testing the effect after shard, change it to index.NewV2
func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error) {
	for _, ts := range req.Timeseries {
		fp := client.FastFingerprint(ts.Labels)
		_, exists := fpMap.LoadOrStore(fp, struct{}{})
		if !exists {
			invertedIndex.Add(ctx, ts.Labels, fp)
		}
	}
        return &client.WriteResponse{}, nil
}

Below is the magnitude of my data:

  1. Normally, it's 830000 timeseries/s.
  2. Each series contains only one sample and about 10 labels in each request.
  3. The cardinality of some labels is relatively large.
  4. Prometheus has a lot of accumulation(latency) when testing (To test the maximum throughput capacity).

The data is not extreme, they come from our real production environment (from a large number of node_exporters).

@storyicon
Copy link
Contributor Author

Here's a simple test out of the box:

package main

import (
	"fmt"
	"math"
	"math/rand"
	"sort"
	"time"

	"github.com/prometheus/common/model"
)

func GenRandomFingerprint(size int) []model.Fingerprint {
	fps := make([]model.Fingerprint, size)
	for i := 0; i < size; i++ {
		fps[i] = model.Fingerprint(rand.Uint64())
	}
	return fps
}

type IndexValueEntry struct {
	fps []model.Fingerprint
}

func NewV1() *IndexValueEntry {
	return &IndexValueEntry{}
}

func (s *IndexValueEntry) add(fp model.Fingerprint) {
	j := sort.Search(len(s.fps), func(i int) bool {
		return s.fps[i] >= fp
	})
	s.fps = append(s.fps, 0)
	copy(s.fps[j+1:], s.fps[j:])
	s.fps[j] = fp
}

type IndexValueEntryV2 struct {
	shards [][]model.Fingerprint
}

func NewV2() *IndexValueEntryV2 {
	shards := make([][]model.Fingerprint, 200)
	return &IndexValueEntryV2{
		shards: shards,
	}
}

func (s *IndexValueEntryV2) add(fp model.Fingerprint) {
	num := int(math.Floor(float64(len(s.shards)) * float64(fp) / math.MaxUint64))
	fps := s.shards[num]
	j := sort.Search(len(fps), func(i int) bool {
		return fps[i] >= fp
	})
	fps = append(fps, 0)
	copy(fps[j+1:], fps[j:])
	fps[j] = fp
	s.shards[num] = fps
}

type Indexer interface {
	add(fp model.Fingerprint)
}

func TestIndexer(indexer Indexer, initSize int, insertSize int) time.Duration {
	fps := GenRandomFingerprint(initSize)
	for _, fp := range fps {
		indexer.add(fp)
	}

	fps = GenRandomFingerprint(insertSize)
	return Elapsed(func() {
		for _, fp := range fps {
			indexer.add(fp)
		}
	})
}

func Elapsed(fn func()) time.Duration {
	t := time.Now()
	fn()
	return time.Since(t)
}

func main() {
	var (
		initSize   = 100000
		insertSize = 10000
	)
	fmt.Println("V2:", TestIndexer(NewV2(), initSize, insertSize))
	fmt.Println("V1:", TestIndexer(NewV1(), initSize, insertSize))
}

It prints these as it runs on my machine:

V2: 3.6664ms
V1: 90.6585ms

When you properly increase initSize and insertSize, you will find that the difference between V1 and V2 will be more than 30 times. At the same time, V2 takes up less memory.
In fact, I think it's normal to have a cardinality greater than 10w. At least in our environment, there are many cardinality over 100w.


In fact, I noticed a shard here:

const indexShards = 32
type InvertedIndex struct {
	shards []indexShard
}

It can effectively reduce lock contention (I call it lock segmentation), but when the value of indexShards is large, it will actually reduce the query efficiency, so it does not conflict with this pr.

Signed-off-by: storyicon <[email protected]>
@pull-request-size pull-request-size bot added size/L and removed size/M labels Aug 27, 2020
Copy link
Contributor

@pstibrany pstibrany left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, that looks really interesting. I have left some comments in the code.

Would it be possible to write a benchmark with adding same number of metrics and compare values before and after this change? (I see you already wrote a comparison above... can you add it as a benchmark to the PR please?)

}

func (c *indexValueEntry) shard(fp model.Fingerprint) int {
return int(math.Floor(float64(len(c.shards)) * float64(fp) / math.MaxUint64))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will return len(c.shards) for values close to math.MaxUint64, which will then cause panics.

Perhaps simplest solution would be to use fp % len(c.shards) as a shard number. (That would require little bit of extra effort when iterating them in sorted order)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for correcting, I made a low-level mistake. This fully illustrates the importance of CodeReview.

Perhaps simplest solution would be to use fp % len(c.shards) as a shard number. (That would require little bit of extra effort when iterating them in sorted order)

Breaking the order of the shard will bring additional costs. I think the simplest way is this:

func (c *indexValueEntry) shard(fp model.Fingerprint) int {
	n := int(math.Floor(float64(len(c.shards)) * float64(fp) / math.MaxUint64))
	if n == len(c.shards) {
		n = len(c.shards) - 1
	}
	return n
}

What is your opinion?

j := sort.Search(len(fps), func(i int) bool {
return fps[i] >= fp
})
c.shards[num] = fps[:j+copy(fps[j:], fps[j+1:])]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If element is not int the fps, this will delete invalid entry or panic. (Original code does the same)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, I copied the previous code directly. Now I've added some conditional statements here:

if len(fps) == j {
    return
}

}

func (c *indexValueEntry) fps() []model.Fingerprint {
var fps []model.Fingerprint
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to save allocations, we can preallocate this slice with length of c.length().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a good idea

j := sort.Search(len(fps), func(i int) bool {
return fps[i] >= fp
})
fps = append(fps, 0)
Copy link
Contributor

@pstibrany pstibrany Aug 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to delete, we don't check if fp is already on given found position or not, and add it anyway. (But seems like adding duplicates is fine, based on original code)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The upper-level logic ensures that there will be no duplicate fingerprints here:
https://github.com/cortexproject/cortex/blob/master/pkg/ingester/user_state.go#L178-L181

@pracucci
Copy link
Contributor

Would it be possible to write a benchmark with adding same number of metrics and compare values before and after this change?

I also would be very interested into this, benchmarking both CPU and memory. For example, the following change may have an impact:

-	fps  map[string]indexValueEntry
+	fps  map[string]*indexValueEntry

@storyicon
Copy link
Contributor Author

Would it be possible to write a benchmark with adding same number of metrics and compare values before and after this change?

I also would be very interested into this, benchmarking both CPU and memory. For example, the following change may have an impact:

-	fps  map[string]indexValueEntry
+	fps  map[string]*indexValueEntry

Hmmm, the benchmark I will commit is not affected by this

@storyicon
Copy link
Contributor Author

storyicon commented Aug 28, 2020

Thank you, that looks really interesting. I have left some comments in the code.

Would it be possible to write a benchmark with adding same number of metrics and compare values before and after this change? (I see you already wrote a comparison above... can you add it as a benchmark to the PR please?)

I submitted a modification, it contains a simple benchmark, this is the result of my running:

$ go test -bench=BenchmarkIndexValueEntry_Add  -benchmem
goos: windows
goarch: amd64
pkg: github.com/cortexproject/cortex/pkg/ingester/index
BenchmarkIndexValueEntry_Add/shard-12            2455293              2992 ns/op              39 B/op          0 allocs/op
BenchmarkIndexValueEntry_Add/plain-12            1000000            182646 ns/op              45 B/op          0 allocs/op
PASS
ok      github.com/cortexproject/cortex/pkg/ingester/index      190.950s

If my benchmark is written without problems, after shard, the insertion speed is increased by 60 times. This is close to the results I tested in my production environment.
In addition, we can see that the memory consumption is smaller after shard, which is consistent with the slice growth mechanism.

@bboreham
Copy link
Contributor

bboreham commented Sep 3, 2020

Each series contains only one sample and about 10 labels.

Cortex (and Prometheus) is designed to handle timeseries that have many values over hours; if you have only one sample in a series then metadata (labels, etc) dominates storage and processing time.

Don't you find that the IO costs of putting this on disk are enormous? I suspect an entirely different storage scheme would be needed to make this style of data manageable.

@storyicon
Copy link
Contributor Author

storyicon commented Sep 3, 2020

Sorry, I mean there is only one sample for each series in each push request, it will accumulate over time.

@bboreham
Copy link
Contributor

bboreham commented Sep 3, 2020

Based on further discussion on Slack, I think you are focused on the time Cortex sees a lot of series for the first time, e.g. after an ingester is restarted, or if a large (tens of millions of series) Prometheus starts sending for the first time.

This is a valid concern, though perhaps one that has slipped past other large users. Possibly other users are scaled more horizontally - the copying effect you describe will increase exponentially as numbers of series in one index shard grows.

I think the same effect should be observable in the ingester hand-over process, where millions of series will be created in a single transaction.

@storyicon
Copy link
Contributor Author

@bboreham Sorry, I was busy with some other things a few days ago, and today I see your reply here.

In order to make the comparison clearer, I re-tested under the same conditions:

  1. Before fixing
    image

    It can be seen that a large number of prometheus latency lasted for about 2h, this problem is fatal enough.

  2. After fixing
    image

    The accumulation time of fingerprints only consumes about 10 minutes, and the short latency is also acceptable.

The test data used above comes from our production environment (not any special stress test data), which only accounts for about 20% of the total. (This is also the data source I have been using before)

When most fingerprints already exist in memory, the speed of both has been greatly improved, but there is no doubt that the revised version will always be a little faster. If a new batch of data is accessed at this time, the fixed version will be able to digest them quickly.
But anyway, there is a two-hour accumulation before fixing, which is basically unavailable.
I admit that through scale-out can help reduce this "start-up" time, but scale-out is not free for cortex, for example: In order to reduce data skew, we need to turn on ShardByAllLabels, which will cause Querier to query all Ingester nodes when querying.
The ability to expand horizontally does not conflict with improving the performance of every single Ingester. This fix may only show its advantages when the amount of data is large enough, but I think with the development of microservices, the amount of data in each company is expanding rapidly, a large amount of prometheus latency may cause users to worry about cortex performance.

@stale
Copy link

stale bot commented Nov 6, 2020

This issue has been automatically marked as stale because it has not had any activity in the past 60 days. It will be closed in 15 days if no further activity occurs. Thank you for your contributions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

The throughput of ingester causes a lot of remote write latency
4 participants