From ec3aeafe407e17a23234a1738e0c37a12acc1eb8 Mon Sep 17 00:00:00 2001 From: Ken Haines Date: Wed, 5 Feb 2020 09:15:57 -0800 Subject: [PATCH 1/4] Adding Microsoft Azure backend support for TSDB storage engine. Signed-off-by: Ken Haines --- CHANGELOG.md | 22 +++++++-------- docs/operations/blocks-storage.md | 24 +++++++++++++++-- .../tsdb/backend/azure/bucket_client.go | 27 +++++++++++++++++++ pkg/storage/tsdb/backend/azure/config.go | 23 ++++++++++++++++ pkg/storage/tsdb/bucket_client.go | 3 +++ pkg/storage/tsdb/config.go | 12 ++++++--- 6 files changed, 95 insertions(+), 16 deletions(-) create mode 100644 pkg/storage/tsdb/backend/azure/bucket_client.go create mode 100644 pkg/storage/tsdb/backend/azure/config.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 2db782cb20f..c1d222dd1aa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ * [CHANGE] Removed unnecessary `frontend.cache-split-interval` in favor of `querier.split-queries-by-interval` both to reduce configuration complexity and guarantee alignment of these two configs. Starting from now, `-querier.cache-results` may only be enabled in conjunction with `-querier.split-queries-by-interval` (previously the cache interval default was `24h` so if you want to preserve the same behaviour you should set `-querier.split-queries-by-interval=24h`). #2040 * [CHANGE] Removed remaining support for using denormalised tokens in the ring. If you're still running ingesters with denormalised tokens (Cortex 0.4 or earlier, with `-ingester.normalise-tokens=false`), such ingesters will now be completely invisible to distributors and need to be either switched to Cortex 0.6.0 or later, or be configured to use normalised tokens. #2034 -* [CHANGE] Moved `--store.min-chunk-age` to the Querier config as `--querier.query-store-after`, allowing the store to be skipped during query time if the metrics wouldn't be found. The YAML config option `ingestermaxquerylookback` has been renamed to `query_ingesters_within` to match its CLI flag. #1893 +* [CHANGE] Moved `--store.min-chunk-age` to the Querier config as `--querier.query-store-after`, allowing the store to be skipped during query time if the metrics wouldn't be found. The YAML config option `ingestermaxquerylookback` has been renamed to `query_ingesters_within` to match its CLI flag. #1893 * `--store.min-chunk-age` has been removed * `--querier.query-store-after` has been added in it's place. * [FEATURE] Added user sub rings to distribute users to a subset of ingesters. #1947 @@ -13,12 +13,13 @@ * [ENHANCEMENT] Experimental TSDB: Export TSDB Syncer metrics from Compactor component, they are prefixed with `cortex_compactor_`. #2023 * [ENHANCEMENT] Experimental TSDB: Added dedicated flag `-experimental.tsdb.bucket-store.tenant-sync-concurrency` to configure the maximum number of concurrent tenants for which blocks are synched. #2026 * [ENHANCEMENT] Experimental TSDB: Expose metrics for objstore operations (prefixed with `cortex__thanos_objstore_`, component being one of `ingester`, `querier` and `compactor`). #2027 +* [ENHANCEMENT] Experiemental TSDB: Added support for Azure Storage to be used for block storage, in addition to S3 and GCS. #2083 * [ENHANCEMENT] Cassanda Storage: added `max_retries`, `retry_min_backoff` and `retry_max_backoff` configuration options to enable retrying recoverable errors. #2054 * [ENHANCEMENT] Allow to configure HTTP and gRPC server listen address, maximum number of simultaneous connections and connection keepalive settings. * `-server.http-listen-address` * `-server.http-conn-limit` * `-server.grpc-listen-address` - * `-server.grpc-conn-limit`  + * `-server.grpc-conn-limit` * `-server.grpc.keepalive.max-connection-idle` * `-server.grpc.keepalive.max-connection-age` * `-server.grpc.keepalive.max-connection-age-grace` @@ -75,7 +76,7 @@ Further, if you're using the configs service, we've upgraded the migration libra * [ENHANCEMENT] Added `password` and `enable_tls` options to redis cache configuration. Enables usage of Microsoft Azure Cache for Redis service. #1923 * [ENHANCEMENT] Upgraded Kubernetes API version for deployments from `extensions/v1beta1` to `apps/v1`. #1941 * [ENHANCEMENT] Experimental TSDB: Open existing TSDB on startup to prevent ingester from becoming ready before it can accept writes. The max concurrency is set via `--experimental.tsdb.max-tsdb-opening-concurrency-on-startup`. #1917 -* [ENHANCEMENT] Experimental TSDB: Querier now exports aggregate metrics from Thanos bucket store and in memory index cache (many metrics to list, but all have `cortex_querier_bucket_store_` or `cortex_querier_blocks_index_cache_` prefix). #1996 +* [ENHANCEMENT] Experimental TSDB: Querier now exports aggregate metrics from Thanos bucket store and in memory index cache (many metrics to list, but all have `cortex_querier_bucket_store_` or `cortex_querier_blocks_index_cache_` prefix). #1996 * [ENHANCEMENT] Experimental TSDB: Improved multi-tenant bucket store. #1991 * Allowed to configure the blocks sync interval via `-experimental.tsdb.bucket-store.sync-interval` (0 disables the sync) * Limited the number of tenants concurrently synched by `-experimental.tsdb.bucket-store.block-sync-concurrency` @@ -91,21 +92,20 @@ Further, if you're using the configs service, we've upgraded the migration libra ### Upgrading PostgreSQL (if you're using configs service) -Reference: https://github.com/golang-migrate/migrate/tree/master/database/postgres#upgrading-from-v1 +Reference: -1. Install the migrate package cli tool: https://github.com/golang-migrate/migrate/tree/master/cmd/migrate#installation +1. Install the migrate package cli tool: 2. Drop the `schema_migrations` table: `DROP TABLE schema_migrations;`. 2. Run the migrate command: ```bash -migrate -path /cmd/cortex/migrations -database postgres://localhost:5432/database force 2 +migrate -path /cmd/cortex/migrations -database postgres://localhost:5432/database force 2 ``` ### Known issues - The `cortex_prometheus_rule_group_last_evaluation_timestamp_seconds` metric, tracked by the ruler, is not unregistered for rule groups not being used anymore. This issue will be fixed in the next Cortex release (see [2033](https://github.com/cortexproject/cortex/issues/2033)). - ## 0.4.0 / 2019-12-02 * [CHANGE] The frontend component has been refactored to be easier to re-use. When upgrading the frontend, cache entries will be discarded and re-created with the new protobuf schema. #1734 @@ -138,6 +138,7 @@ migrate -path /cmd/cortex/migrations -database postgre * [BUGFIX] Fixed duplicated series returned when querying both ingesters and store with the experimental TSDB blocks storage. #1778 In this release we updated the following dependencies: + - gRPC v1.25.0 (resulted in a drop of 30% CPU usage when compression is on) - jaeger-client v2.20.0 - aws-sdk-go to v1.25.22 @@ -146,21 +147,20 @@ In this release we updated the following dependencies: This release adds support for Redis as an alternative to Memcached, and also includes many optimisations which reduce CPU and memory usage. -* [CHANGE] Gauge metrics were renamed to drop the `_total` suffix. #1685 +* [CHANGE] Gauge metrics were renamed to drop the `_total` suffix. #1685 * In Alertmanager, `alertmanager_configs_total` is now `alertmanager_configs` * In Ruler, `scheduler_configs_total` is now `scheduler_configs` * `scheduler_groups_total` is now `scheduler_groups`. * [CHANGE] `--alertmanager.configs.auto-slack-root` flag was dropped as auto Slack root is not supported anymore. #1597 * [CHANGE] In table-manager, default DynamoDB capacity was reduced from 3,000 units to 1,000 units. We recommend you do not run with the defaults: find out what figures are needed for your environment and set that via `-dynamodb.periodic-table.write-throughput` and `-dynamodb.chunk-table.write-throughput`. * [FEATURE] Add Redis support for caching #1612 -* [FEATURE] Allow spreading chunk writes across multiple S3 buckets #1625 +* [FEATURE] Allow spreading chunk writes across multiple S3 buckets #1625 * [FEATURE] Added `/shutdown` endpoint for ingester to shutdown all operations of the ingester. #1746 * [ENHANCEMENT] Upgraded Prometheus to 2.12.0 and Alertmanager to 0.19.0. #1597 * [ENHANCEMENT] Cortex is now built with Go 1.13 #1675, #1676, #1679 * [ENHANCEMENT] Many optimisations, mostly impacting ingester and querier: #1574, #1624, #1638, #1644, #1649, #1654, #1702 -Full list of changes: https://github.com/cortexproject/cortex/compare/v0.2.0...v0.3.0 - +Full list of changes: ## 0.2.0 / 2019-09-05 diff --git a/docs/operations/blocks-storage.md b/docs/operations/blocks-storage.md index b34d4324ac1..4d72f591a45 100644 --- a/docs/operations/blocks-storage.md +++ b/docs/operations/blocks-storage.md @@ -11,6 +11,7 @@ The supported backends for the blocks storage are: * [Amazon S3](https://aws.amazon.com/s3) * [Google Cloud Storage](https://cloud.google.com/storage/) +* [Microsoft Azure Storage](https://azure.microsoft.com/en-us/services/storage/) _Internally, this storage engine is based on [Thanos](https://thanos.io), but no Thanos knowledge is required in order to run it._ @@ -28,7 +29,6 @@ When the blocks storage is used, each **ingester** creates a per-tenant TSDB and The in-memory samples are periodically flushed to disk - and the WAL truncated - when a new TSDB Block is cut, which by default occurs every 2 hours. Each new Block cut is then uploaded to the long-term storage and kept in the ingester for some more time, in order to give queriers enough time to discover the new Block from the storage and download its index header. - In order to effectively use the **WAL** and being able to recover the in-memory series upon ingester abruptly termination, the WAL needs to be stored to a persistent local disk which can survive in the event of an ingester failure (ie. AWS EBS volume or GCP persistent disk when running in the cloud). For example, if you're running the Cortex cluster in Kubernetes, you may use a StatefulSet with a persistent volume claim for the ingesters. ### The read path @@ -138,7 +138,7 @@ tsdb: # storage. 0 disables the limit. # CLI flag: -experimental.tsdb.bucket-store.max-sample-count [max_sample_count: | default = 0] - + # Max number of concurrent queries to execute against the long-term storage # on a per-tenant basis. # CLI flag: -experimental.tsdb.bucket-store.max-concurrent @@ -189,6 +189,26 @@ tsdb: # Google SDK default logic. # CLI flag: -experimental.tsdb.gcs.service-account string [ service_account: ] + + # Configures the Azure storage backend + # Required only when "azure" backend has been selected. + azure: + # Azure storage account name + # CLI flag: -experimental.tsdb.azure.account-name + storage_account: + # Azure storage account key + # CLI flag: -experimental.tsdb.azure.account-key + storage_account_key: + # Azure storage container name + # CLI flag: -experimental.tsdb.azure.container-name + container: + # Azure storage endpoint suffix without schema. + # The account name will be prefixed to this value to create the FQDN + # CLI flag: -experimental.tsdb.azure.endpoint-suffix + endpoint_suffix: + # Number of retries for recoverable errors (default 20) + # CLI flag: -experimental.tsdb.azure.max-retries + [ max_retries: | default=20 ] ``` ### `compactor_config` diff --git a/pkg/storage/tsdb/backend/azure/bucket_client.go b/pkg/storage/tsdb/backend/azure/bucket_client.go new file mode 100644 index 00000000000..d414fe1d251 --- /dev/null +++ b/pkg/storage/tsdb/backend/azure/bucket_client.go @@ -0,0 +1,27 @@ +package azure + +import ( + "github.com/go-kit/kit/log" + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/objstore/azure" + yaml "gopkg.in/yaml.v2" +) + +func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucket, error) { + bucketConfig := azure.Config{ + StorageAccountName: cfg.StorageAccountName, + StorageAccountKey: cfg.StorageAccountKey, + ContainerName: cfg.ContainerName, + Endpoint: cfg.Endpoint, + MaxRetries: cfg.MaxRetries, + } + + // Thanos currently doesn't support passing the config as is, but expects a YAML, + // so we're going to serialize it. + serialized, err := yaml.Marshal(bucketConfig) + if err != nil { + return nil, err + } + + return azure.NewBucket(logger, serialized, name) +} diff --git a/pkg/storage/tsdb/backend/azure/config.go b/pkg/storage/tsdb/backend/azure/config.go new file mode 100644 index 00000000000..f1da6aa304e --- /dev/null +++ b/pkg/storage/tsdb/backend/azure/config.go @@ -0,0 +1,23 @@ +package azure + +import ( + "flag" +) + +// Config holds the config options for an Azure backend +type Config struct { + StorageAccountName string `yaml:"storage_account"` + StorageAccountKey string `yaml:"storage_account_key"` + ContainerName string `yaml:"container"` + Endpoint string `yaml:"endpoint_suffix"` + MaxRetries int `yaml:"max_retries"` +} + +// RegisterFlags registers the flags for TSDB Azure storage +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.StringVar(&cfg.StorageAccountName, "experimental.tsdb.azure.account-name", "", "Azure storage account name") + f.StringVar(&cfg.StorageAccountKey, "experimental.tsdb.azure.account-key", "", "Azure storage account key") + f.StringVar(&cfg.ContainerName, "experimental.tsdb.azure.container-name", "", "Azure storage container name") + f.StringVar(&cfg.Endpoint, "experimental.tsdb.azure.endpoint-suffix", "", "Azure storage endpoint suffix without schema. The account name will be prefixed to this value to create the FQDN") + f.IntVar(&cfg.MaxRetries, "experimental.tsdb.azure.max-retries", 20, "Number of retries for recoverable errors (default 20)") +} diff --git a/pkg/storage/tsdb/bucket_client.go b/pkg/storage/tsdb/bucket_client.go index e6bff903047..9e81bfb6c82 100644 --- a/pkg/storage/tsdb/bucket_client.go +++ b/pkg/storage/tsdb/bucket_client.go @@ -3,6 +3,7 @@ package tsdb import ( "context" + "github.com/cortexproject/cortex/pkg/storage/tsdb/backend/azure" "github.com/cortexproject/cortex/pkg/storage/tsdb/backend/gcs" "github.com/cortexproject/cortex/pkg/storage/tsdb/backend/s3" "github.com/go-kit/kit/log" @@ -16,6 +17,8 @@ func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Lo return s3.NewBucketClient(cfg.S3, name, logger) case BackendGCS: return gcs.NewBucketClient(ctx, cfg.GCS, name, logger) + case BackendAzure: + return azure.NewBucketClient(cfg.Azure, name, logger) default: return nil, errUnsupportedBackend } diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 3f94dfb179e..df9aa44d421 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -8,6 +8,7 @@ import ( "time" "github.com/alecthomas/units" + "github.com/cortexproject/cortex/pkg/storage/tsdb/backend/azure" "github.com/cortexproject/cortex/pkg/storage/tsdb/backend/gcs" "github.com/cortexproject/cortex/pkg/storage/tsdb/backend/s3" ) @@ -19,6 +20,9 @@ const ( // BackendGCS is the value for the GCS storage backend BackendGCS = "gcs" + // BackendAzure is teh value for the Azure storage backend + BackendAzure = "azure" + // TenantIDExternalLabel is the external label set when shipping blocks to the storage TenantIDExternalLabel = "__org_id__" ) @@ -43,8 +47,9 @@ type Config struct { MaxTSDBOpeningConcurrencyOnStartup int `yaml:"max_tsdb_opening_concurrency_on_startup"` // Backends - S3 s3.Config `yaml:"s3"` - GCS gcs.Config `yaml:"gcs"` + S3 s3.Config `yaml:"s3"` + GCS gcs.Config `yaml:"gcs"` + Azure azure.Config `yaml:"azure"` } // DurationList is the block ranges for a tsdb @@ -88,6 +93,7 @@ func (d *DurationList) ToMilliseconds() []int64 { func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.S3.RegisterFlags(f) cfg.GCS.RegisterFlags(f) + cfg.Azure.RegisterFlags(f) cfg.BucketStore.RegisterFlags(f) if len(cfg.BlockRanges) == 0 { @@ -105,7 +111,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { // Validate the config func (cfg *Config) Validate() error { - if cfg.Backend != BackendS3 && cfg.Backend != BackendGCS { + if cfg.Backend != BackendS3 && cfg.Backend != BackendGCS && cfg.Backend != BackendAzure { return errUnsupportedBackend } From cfac884b6039843c2d52a87ca5bdf00126e6bef7 Mon Sep 17 00:00:00 2001 From: Ken Haines Date: Wed, 5 Feb 2020 16:04:38 -0800 Subject: [PATCH 2/4] correcting minor typo caught by linter Signed-off-by: Ken Haines --- docs/architecture.md | 22 ++++++++++++---------- docs/operations/blocks-storage.md | 2 +- pkg/storage/tsdb/backend/azure/config.go | 8 ++++---- pkg/storage/tsdb/config.go | 2 +- 4 files changed, 18 insertions(+), 16 deletions(-) diff --git a/docs/architecture.md b/docs/architecture.md index 907a6fae6eb..7b4bbeebb41 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -33,15 +33,16 @@ The chunks storage stores each single time series into a separate object called For this reason, the chunks storage consists of: * An index for the Chunks. This index can be backed by: - * [Amazon DynamoDB](https://aws.amazon.com/dynamodb) - * [Google Bigtable](https://cloud.google.com/bigtable) - * [Apache Cassandra](https://cassandra.apache.org) + * [Amazon DynamoDB](https://aws.amazon.com/dynamodb) + * [Google Bigtable](https://cloud.google.com/bigtable) + * [Apache Cassandra](https://cassandra.apache.org) * An object store for the Chunk data itself, which can be: - * [Amazon DynamoDB](https://aws.amazon.com/dynamodb) - * [Google Bigtable](https://cloud.google.com/bigtable) - * [Apache Cassandra](https://cassandra.apache.org) - * [Amazon S3](https://aws.amazon.com/s3) - * [Google Cloud Storage](https://cloud.google.com/storage/) + * [Amazon DynamoDB](https://aws.amazon.com/dynamodb) + * [Google Bigtable](https://cloud.google.com/bigtable) + * [Apache Cassandra](https://cassandra.apache.org) + * [Amazon S3](https://aws.amazon.com/s3) + * [Google Cloud Storage](https://cloud.google.com/storage/) + * [Microsoft Azure Storage](https://azure.microsoft.com/en-us/services/storage/) Internally, the access to the chunks storage relies on a unified interface called "chunks store". Unlike other Cortex components, the chunk store is not a separate service, but rather a library embedded in the services that need to access the long-term storage: [ingester](#ingester), [querier](#querier) and [ruler](#ruler). @@ -59,6 +60,7 @@ The blocks storage doesn't require a dedicated storage backend for the index. Th * [Amazon S3](https://aws.amazon.com/s3) * [Google Cloud Storage](https://cloud.google.com/storage/) +* [Microsoft Azure Storage](https://azure.microsoft.com/en-us/services/storage/) For more information, please check out the [Blocks storage](operations/blocks-storage.md) documentation. @@ -142,7 +144,7 @@ We recommend randomly load balancing write requests across distributor instances ### Ingester -The **ingester** service is responsible for writing incoming series to a [long-term storage backend](#storage) on the write path and returning in-memory series samples for queries on the read path. +The **ingester** service is responsible for writing incoming series to a [long-term storage backend](#storage) on the write path and returning in-memory series samples for queries on the read path. Incoming series are not immediately written to the storage but kept in memory and periodically flushed to the storage (by default, 12 hours for the chunks storage and 2 hours for the experimental blocks storage). For this reason, the [queriers](#querier) may need to fetch samples both from ingesters and long-term storage while executing a query on the read path. @@ -154,7 +156,7 @@ Ingesters contain a **lifecycler** which manages the lifecycle of an ingester an 3. `ACTIVE` is an ingester's state when it is fully initialized. It may receive both write and read requests for tokens it owns. -4. `LEAVING` is an ingester's state when it is shutting down. It cannot receive write requests anymore, while it could still receive read requests for series it has in memory. While in this state, the ingester may look for a `PENDING` ingester to start a hand-over process with, used to transfer the state from `LEAVING` ingester to the `PENDING` one, during a rolling update (`PENDING` ingester moves to `JOINING` state during hand-over process). If there is no new ingester to accept hand-over, ingester in `LEAVING` state will flush data to storage instead. +4. `LEAVING` is an ingester's state when it is shutting down. It cannot receive write requests anymore, while it could still receive read requests for series it has in memory. While in this state, the ingester may look for a `PENDING` ingester to start a hand-over process with, used to transfer the state from `LEAVING` ingester to the `PENDING` one, during a rolling update (`PENDING` ingester moves to `JOINING` state during hand-over process). If there is no new ingester to accept hand-over, ingester in `LEAVING` state will flush data to storage instead. 5. `UNHEALTHY` is an ingester's state when it has failed to heartbeat to the ring's KV Store. While in this state, distributors skip the ingester while building the replication set for incoming series and the ingester does not receive write or read requests. diff --git a/docs/operations/blocks-storage.md b/docs/operations/blocks-storage.md index 4d72f591a45..f471074e00f 100644 --- a/docs/operations/blocks-storage.md +++ b/docs/operations/blocks-storage.md @@ -206,7 +206,7 @@ tsdb: # The account name will be prefixed to this value to create the FQDN # CLI flag: -experimental.tsdb.azure.endpoint-suffix endpoint_suffix: - # Number of retries for recoverable errors (default 20) + # Number of retries for recoverable errors # CLI flag: -experimental.tsdb.azure.max-retries [ max_retries: | default=20 ] ``` diff --git a/pkg/storage/tsdb/backend/azure/config.go b/pkg/storage/tsdb/backend/azure/config.go index f1da6aa304e..3fc29f5ef1b 100644 --- a/pkg/storage/tsdb/backend/azure/config.go +++ b/pkg/storage/tsdb/backend/azure/config.go @@ -6,9 +6,9 @@ import ( // Config holds the config options for an Azure backend type Config struct { - StorageAccountName string `yaml:"storage_account"` - StorageAccountKey string `yaml:"storage_account_key"` - ContainerName string `yaml:"container"` + StorageAccountName string `yaml:"account_name"` + StorageAccountKey string `yaml:"account_key"` + ContainerName string `yaml:"container_name"` Endpoint string `yaml:"endpoint_suffix"` MaxRetries int `yaml:"max_retries"` } @@ -19,5 +19,5 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.StorageAccountKey, "experimental.tsdb.azure.account-key", "", "Azure storage account key") f.StringVar(&cfg.ContainerName, "experimental.tsdb.azure.container-name", "", "Azure storage container name") f.StringVar(&cfg.Endpoint, "experimental.tsdb.azure.endpoint-suffix", "", "Azure storage endpoint suffix without schema. The account name will be prefixed to this value to create the FQDN") - f.IntVar(&cfg.MaxRetries, "experimental.tsdb.azure.max-retries", 20, "Number of retries for recoverable errors (default 20)") + f.IntVar(&cfg.MaxRetries, "experimental.tsdb.azure.max-retries", 20, "Number of retries for recoverable errors") } diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index df9aa44d421..f259b3be875 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -20,7 +20,7 @@ const ( // BackendGCS is the value for the GCS storage backend BackendGCS = "gcs" - // BackendAzure is teh value for the Azure storage backend + // BackendAzure is the value for the Azure storage backend BackendAzure = "azure" // TenantIDExternalLabel is the external label set when shipping blocks to the storage From 1e5679a629d937e643cf4a8b921e969867448f9d Mon Sep 17 00:00:00 2001 From: Ken Haines Date: Wed, 5 Feb 2020 21:16:06 -0800 Subject: [PATCH 3/4] updating the vendor's module for thanos to include azure file Signed-off-by: Ken Haines --- .../thanos/pkg/objstore/azure/azure.go | 336 ++++++++++++++++++ .../thanos/pkg/objstore/azure/helpers.go | 80 +++++ vendor/modules.txt | 1 + 3 files changed, 417 insertions(+) create mode 100644 vendor/github.com/thanos-io/thanos/pkg/objstore/azure/azure.go create mode 100644 vendor/github.com/thanos-io/thanos/pkg/objstore/azure/helpers.go diff --git a/vendor/github.com/thanos-io/thanos/pkg/objstore/azure/azure.go b/vendor/github.com/thanos-io/thanos/pkg/objstore/azure/azure.go new file mode 100644 index 00000000000..b5ef93ab3de --- /dev/null +++ b/vendor/github.com/thanos-io/thanos/pkg/objstore/azure/azure.go @@ -0,0 +1,336 @@ +package azure + +import ( + "bytes" + "context" + "io" + "io/ioutil" + "os" + "strings" + "testing" + + blob "github.com/Azure/azure-storage-blob-go/azblob" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/objstore" + yaml "gopkg.in/yaml.v2" +) + +const ( + azureDefaultEndpoint = "blob.core.windows.net" +) + +// Config Azure storage configuration. +type Config struct { + StorageAccountName string `yaml:"storage_account"` + StorageAccountKey string `yaml:"storage_account_key"` + ContainerName string `yaml:"container"` + Endpoint string `yaml:"endpoint"` + MaxRetries int `yaml:"max_retries"` +} + +// Bucket implements the store.Bucket interface against Azure APIs. +type Bucket struct { + logger log.Logger + containerURL blob.ContainerURL + config *Config +} + +// Validate checks to see if any of the config options are set. +func (conf *Config) validate() error { + if conf.StorageAccountName == "" || + conf.StorageAccountKey == "" { + return errors.New("invalid Azure storage configuration") + } + if conf.StorageAccountName == "" && conf.StorageAccountKey != "" { + return errors.New("no Azure storage_account specified while storage_account_key is present in config file; both should be present") + } + if conf.StorageAccountName != "" && conf.StorageAccountKey == "" { + return errors.New("no Azure storage_account_key specified while storage_account is present in config file; both should be present") + } + if conf.ContainerName == "" { + return errors.New("no Azure container specified") + } + if conf.Endpoint == "" { + conf.Endpoint = azureDefaultEndpoint + } + if conf.MaxRetries < 0 { + return errors.New("the value of maxretries must be greater than or equal to 0 in the config file") + } + return nil +} + +// NewBucket returns a new Bucket using the provided Azure config. +func NewBucket(logger log.Logger, azureConfig []byte, component string) (*Bucket, error) { + level.Debug(logger).Log("msg", "creating new Azure bucket connection", "component", component) + + var conf Config + if err := yaml.Unmarshal(azureConfig, &conf); err != nil { + return nil, err + } + + if err := conf.validate(); err != nil { + return nil, err + } + + ctx := context.Background() + container, err := createContainer(ctx, conf) + if err != nil { + ret, ok := err.(blob.StorageError) + if !ok { + return nil, errors.Wrapf(err, "Azure API return unexpected error: %T\n", err) + } + if ret.ServiceCode() == "ContainerAlreadyExists" { + level.Debug(logger).Log("msg", "Getting connection to existing Azure blob container", "container", conf.ContainerName) + container, err = getContainer(ctx, conf) + if err != nil { + return nil, errors.Wrapf(err, "cannot get existing Azure blob container: %s", container) + } + } else { + return nil, errors.Wrapf(err, "error creating Azure blob container: %s", container) + } + } else { + level.Info(logger).Log("msg", "Azure blob container successfully created", "address", container) + } + + bkt := &Bucket{ + logger: logger, + containerURL: container, + config: &conf, + } + return bkt, nil +} + +// Iter calls f for each entry in the given directory. The argument to f is the full +// object name including the prefix of the inspected directory. +func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) error { + + prefix := dir + if prefix != "" && !strings.HasSuffix(prefix, DirDelim) { + prefix += DirDelim + } + + marker := blob.Marker{} + + for i := 1; ; i++ { + list, err := b.containerURL.ListBlobsHierarchySegment(ctx, marker, DirDelim, blob.ListBlobsSegmentOptions{ + Prefix: prefix, + }) + + if err != nil { + return errors.Wrapf(err, "cannot list blobs in directory %s (iteration #%d)", dir, i) + } + + marker = list.NextMarker + + var listNames []string + + for _, blob := range list.Segment.BlobItems { + listNames = append(listNames, blob.Name) + } + + for _, blobPrefix := range list.Segment.BlobPrefixes { + listNames = append(listNames, blobPrefix.Name) + } + + for _, name := range listNames { + if err := f(name); err != nil { + return err + } + } + + // Continue iterating if we are not done. + if !marker.NotDone() { + break + } + + level.Debug(b.logger).Log("msg", "requesting next iteration of listing blobs", "last_entries", len(listNames), "iteration", i) + } + + return nil +} + +// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations. +func (b *Bucket) IsObjNotFoundErr(err error) bool { + if err == nil { + return false + } + + errorCode := parseError(err.Error()) + if errorCode == "InvalidUri" || errorCode == "BlobNotFound" { + return true + } + + return false +} + +func (b *Bucket) getBlobReader(ctx context.Context, name string, offset, length int64) (io.ReadCloser, error) { + level.Debug(b.logger).Log("msg", "getting blob", "blob", name, "offset", offset, "length", length) + if len(name) == 0 { + return nil, errors.New("X-Ms-Error-Code: [EmptyContainerName]") + } + exists, err := b.Exists(ctx, name) + if err != nil { + return nil, errors.Wrapf(err, "cannot get blob reader: %s", name) + } + + if !exists { + return nil, errors.New("X-Ms-Error-Code: [BlobNotFound]") + } + + blobURL, err := getBlobURL(ctx, *b.config, name) + if err != nil { + return nil, errors.Wrapf(err, "cannot get Azure blob URL, address: %s", name) + } + var props *blob.BlobGetPropertiesResponse + props, err = blobURL.GetProperties(ctx, blob.BlobAccessConditions{}) + if err != nil { + return nil, errors.Wrapf(err, "cannot get properties for container: %s", name) + } + + var size int64 + // If a length is specified and it won't go past the end of the file, + // then set it as the size. + if length > 0 && length <= props.ContentLength()-offset { + size = length + level.Debug(b.logger).Log("msg", "set size to length", "size", size, "length", length, "offset", offset, "name", name) + } else { + size = props.ContentLength() - offset + level.Debug(b.logger).Log("msg", "set size to go to EOF", "contentlength", props.ContentLength(), "size", size, "length", length, "offset", offset, "name", name) + } + + destBuffer := make([]byte, size) + + if err := blob.DownloadBlobToBuffer(context.Background(), blobURL.BlobURL, offset, size, + destBuffer, blob.DownloadFromBlobOptions{ + BlockSize: blob.BlobDefaultDownloadBlockSize, + Parallelism: uint16(3), + Progress: nil, + RetryReaderOptionsPerBlock: blob.RetryReaderOptions{ + MaxRetryRequests: b.config.MaxRetries, + }, + }, + ); err != nil { + return nil, errors.Wrapf(err, "cannot download blob, address: %s", blobURL.BlobURL) + } + + return ioutil.NopCloser(bytes.NewReader(destBuffer)), nil +} + +// Get returns a reader for the given object name. +func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { + return b.getBlobReader(ctx, name, 0, blob.CountToEnd) +} + +// GetRange returns a new range reader for the given object name and range. +func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + return b.getBlobReader(ctx, name, off, length) +} + +// ObjectSize returns the size of the specified object. +func (b *Bucket) ObjectSize(ctx context.Context, name string) (uint64, error) { + blobURL, err := getBlobURL(ctx, *b.config, name) + if err != nil { + return 0, errors.Wrapf(err, "cannot get Azure blob URL, blob: %s", name) + } + var props *blob.BlobGetPropertiesResponse + props, err = blobURL.GetProperties(ctx, blob.BlobAccessConditions{}) + if err != nil { + return 0, err + } + return uint64(props.ContentLength()), nil +} + +// Exists checks if the given object exists. +func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) { + level.Debug(b.logger).Log("msg", "check if blob exists", "blob", name) + blobURL, err := getBlobURL(ctx, *b.config, name) + if err != nil { + return false, errors.Wrapf(err, "cannot get Azure blob URL, address: %s", name) + } + + if _, err = blobURL.GetProperties(ctx, blob.BlobAccessConditions{}); err != nil { + if b.IsObjNotFoundErr(err) { + return false, nil + } + return false, errors.Wrapf(err, "cannot get properties for Azure blob, address: %s", name) + } + + return true, nil +} + +// Upload the contents of the reader as an object into the bucket. +func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { + level.Debug(b.logger).Log("msg", "Uploading blob", "blob", name) + blobURL, err := getBlobURL(ctx, *b.config, name) + if err != nil { + return errors.Wrapf(err, "cannot get Azure blob URL, address: %s", name) + } + if _, err = blob.UploadStreamToBlockBlob(ctx, r, blobURL, + blob.UploadStreamToBlockBlobOptions{ + BufferSize: 3 * 1024 * 1024, + MaxBuffers: 4, + }, + ); err != nil { + return errors.Wrapf(err, "cannot upload Azure blob, address: %s", name) + } + return nil +} + +// Delete removes the object with the given name. +func (b *Bucket) Delete(ctx context.Context, name string) error { + level.Debug(b.logger).Log("msg", "Deleting blob", "blob", name) + blobURL, err := getBlobURL(ctx, *b.config, name) + if err != nil { + return errors.Wrapf(err, "cannot get Azure blob URL, address: %s", name) + } + + if _, err = blobURL.Delete(ctx, blob.DeleteSnapshotsOptionInclude, blob.BlobAccessConditions{}); err != nil { + return errors.Wrapf(err, "error deleting blob, address: %s", name) + } + return nil +} + +// Name returns Azure container name. +func (b *Bucket) Name() string { + return b.config.ContainerName +} + +// NewTestBucket creates test bkt client that before returning creates temporary bucket. +// In a close function it empties and deletes the bucket. +func NewTestBucket(t testing.TB, component string) (objstore.Bucket, func(), error) { + t.Log("Using test Azure bucket.") + + conf := &Config{ + StorageAccountName: os.Getenv("AZURE_STORAGE_ACCOUNT"), + StorageAccountKey: os.Getenv("AZURE_STORAGE_ACCESS_KEY"), + ContainerName: objstore.CreateTemporaryTestBucketName(t), + } + + bc, err := yaml.Marshal(conf) + if err != nil { + return nil, nil, err + } + + ctx := context.Background() + + bkt, err := NewBucket(log.NewNopLogger(), bc, component) + if err != nil { + t.Errorf("Cannot create Azure storage container:") + return nil, nil, err + } + + return bkt, func() { + objstore.EmptyBucket(t, ctx, bkt) + err = bkt.Delete(ctx, conf.ContainerName) + if err != nil { + t.Logf("deleting bucket failed: %s", err) + } + }, nil +} + +// Close bucket. +func (b *Bucket) Close() error { + return nil +} diff --git a/vendor/github.com/thanos-io/thanos/pkg/objstore/azure/helpers.go b/vendor/github.com/thanos-io/thanos/pkg/objstore/azure/helpers.go new file mode 100644 index 00000000000..86a76f0911b --- /dev/null +++ b/vendor/github.com/thanos-io/thanos/pkg/objstore/azure/helpers.go @@ -0,0 +1,80 @@ +package azure + +import ( + "context" + "fmt" + "net/url" + "regexp" + "time" + + blob "github.com/Azure/azure-storage-blob-go/azblob" +) + +// DirDelim is the delimiter used to model a directory structure in an object store bucket. +const DirDelim = "/" + +var errorCodeRegex = regexp.MustCompile(`X-Ms-Error-Code:\D*\[(\w+)\]`) + +func getContainerURL(ctx context.Context, conf Config) (blob.ContainerURL, error) { + c, err := blob.NewSharedKeyCredential(conf.StorageAccountName, conf.StorageAccountKey) + if err != nil { + return blob.ContainerURL{}, err + } + + retryOptions := blob.RetryOptions{ + MaxTries: int32(conf.MaxRetries), + } + if deadline, ok := ctx.Deadline(); ok { + retryOptions.TryTimeout = time.Until(deadline) + } + + p := blob.NewPipeline(c, blob.PipelineOptions{ + Retry: retryOptions, + Telemetry: blob.TelemetryOptions{Value: "Thanos"}, + }) + u, err := url.Parse(fmt.Sprintf("https://%s.%s", conf.StorageAccountName, conf.Endpoint)) + if err != nil { + return blob.ContainerURL{}, err + } + service := blob.NewServiceURL(*u, p) + + return service.NewContainerURL(conf.ContainerName), nil +} + +func getContainer(ctx context.Context, conf Config) (blob.ContainerURL, error) { + c, err := getContainerURL(ctx, conf) + if err != nil { + return blob.ContainerURL{}, err + } + // Getting container properties to check if it exists or not. Returns error which will be parsed further. + _, err = c.GetProperties(ctx, blob.LeaseAccessConditions{}) + return c, err +} + +func createContainer(ctx context.Context, conf Config) (blob.ContainerURL, error) { + c, err := getContainerURL(ctx, conf) + if err != nil { + return blob.ContainerURL{}, err + } + _, err = c.Create( + ctx, + blob.Metadata{}, + blob.PublicAccessNone) + return c, err +} + +func getBlobURL(ctx context.Context, conf Config, blobName string) (blob.BlockBlobURL, error) { + c, err := getContainerURL(ctx, conf) + if err != nil { + return blob.BlockBlobURL{}, err + } + return c.NewBlockBlobURL(blobName), nil +} + +func parseError(errorCode string) string { + match := errorCodeRegex.FindStringSubmatch(errorCode) + if len(match) == 2 { + return match[1] + } + return errorCode +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 85c6e7c2350..9d7a38b7bdf 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -535,6 +535,7 @@ github.com/thanos-io/thanos/pkg/extprom github.com/thanos-io/thanos/pkg/gate github.com/thanos-io/thanos/pkg/model github.com/thanos-io/thanos/pkg/objstore +github.com/thanos-io/thanos/pkg/objstore/azure github.com/thanos-io/thanos/pkg/objstore/gcs github.com/thanos-io/thanos/pkg/objstore/s3 github.com/thanos-io/thanos/pkg/pool From bd2d9aced0a35fff6b9c6397f88ade5ea5b603fb Mon Sep 17 00:00:00 2001 From: Ken Haines Date: Thu, 6 Feb 2020 08:16:35 -0800 Subject: [PATCH 4/4] a few more doc tweaks for consistency Signed-off-by: Ken Haines --- docs/operations/blocks-storage.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/operations/blocks-storage.md b/docs/operations/blocks-storage.md index f471074e00f..93f24bf0073 100644 --- a/docs/operations/blocks-storage.md +++ b/docs/operations/blocks-storage.md @@ -195,13 +195,13 @@ tsdb: azure: # Azure storage account name # CLI flag: -experimental.tsdb.azure.account-name - storage_account: + account_name: # Azure storage account key # CLI flag: -experimental.tsdb.azure.account-key - storage_account_key: + account_key: # Azure storage container name # CLI flag: -experimental.tsdb.azure.container-name - container: + container_name: # Azure storage endpoint suffix without schema. # The account name will be prefixed to this value to create the FQDN # CLI flag: -experimental.tsdb.azure.endpoint-suffix