From 8bbbccd3624fbd651efdb3fe248d928ba26570c8 Mon Sep 17 00:00:00 2001 From: Shvejan Mutheboyina Date: Wed, 4 Mar 2026 06:41:39 +0000 Subject: [PATCH] making query ingesters config per tenant Signed-off-by: Shvejan Mutheboyina --- CHANGELOG.md | 1 + pkg/cortex/cortex.go | 3 + pkg/cortex/modules.go | 2 - pkg/cortex/runtime_config.go | 5 +- pkg/distributor/distributor.go | 3 - pkg/distributor/distributor_test.go | 146 ++++++++++- pkg/distributor/query.go | 4 +- pkg/ingester/ingester.go | 9 +- pkg/ingester/ingester_test.go | 13 +- pkg/querier/blocks_store_queryable.go | 25 +- pkg/querier/blocks_store_queryable_test.go | 236 +++++++++++++++++- pkg/querier/distributor_queryable.go | 26 +- pkg/querier/distributor_queryable_test.go | 132 +++++++++- pkg/querier/parquet_queryable.go | 22 +- pkg/querier/parquet_queryable_test.go | 31 +-- pkg/querier/querier.go | 49 ++-- pkg/querier/querier_test.go | 81 +++--- pkg/util/validation/exporter_test.go | 3 + pkg/util/validation/limits.go | 50 ++++ pkg/util/validation/limits_test.go | 271 +++++++++++++++++++++ 20 files changed, 949 insertions(+), 163 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 16b1d37b5ed..01955390994 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,7 @@ * [ENHANCEMENT] Compactor: Add partition group creation time to visit marker. #7217 * [ENHANCEMENT] Compactor: Add concurrency for partition cleanup and mark block for deletion #7246 * [ENHANCEMENT] Distributor: Validate metric name before removing empty labels. #7253 +* [ENHANCEMENT] Make query ingester within a per tenant configuration. #7160 * [BUGFIX] Distributor: If remote write v2 is disabled, explicitly return HTTP 415 (Unsupported Media Type) for Remote Write V2 requests instead of attempting to parse them as V1. #7238 * [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088 * [BUGFIX] Ruler: Add XFunctions validation support. #7111 diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 097c9aa2e02..05998bb04ff 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -218,6 +218,9 @@ func (c *Config) Validate(log log.Logger) error { if err := c.LimitsConfig.Validate(c.NameValidationScheme, c.Distributor.ShardByAllLabels, c.Ingester.ActiveSeriesMetricsEnabled); err != nil { return errors.Wrap(err, "invalid limits config") } + if err := c.LimitsConfig.ValidateQueryLimits("default", c.BlocksStorage.TSDB.CloseIdleTSDBTimeout); err != nil { + return errors.Wrap(err, "invalid query routing config") + } if err := c.ResourceMonitor.Validate(); err != nil { return errors.Wrap(err, "invalid resource-monitor config") } diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index fbc6db605f0..a53be7b355e 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -242,7 +242,6 @@ func (t *Cortex) initOverridesExporter() (services.Service, error) { func (t *Cortex) initDistributorService() (serv services.Service, err error) { t.Cfg.Distributor.DistributorRing.ListenPort = t.Cfg.Server.GRPCListenPort - t.Cfg.Distributor.ShuffleShardingLookbackPeriod = t.Cfg.Querier.ShuffleShardingIngestersLookbackPeriod t.Cfg.Distributor.NameValidationScheme = t.Cfg.NameValidationScheme t.Cfg.IngesterClient.GRPCClientConfig.SignWriteRequestsEnabled = t.Cfg.Distributor.SignWriteRequestsEnabled @@ -495,7 +494,6 @@ func (t *Cortex) initIngesterService() (serv services.Service, err error) { t.Cfg.Ingester.DistributorShardingStrategy = t.Cfg.Distributor.ShardingStrategy t.Cfg.Ingester.DistributorShardByAllLabels = t.Cfg.Distributor.ShardByAllLabels t.Cfg.Ingester.InstanceLimitsFn = ingesterInstanceLimits(t.RuntimeConfig) - t.Cfg.Ingester.QueryIngestersWithin = t.Cfg.Querier.QueryIngestersWithin t.tsdbIngesterConfig() t.Ingester, err = ingester.New(t.Cfg.Ingester, t.OverridesConfig, prometheus.DefaultRegisterer, util_log.Logger, t.ResourceMonitor) diff --git a/pkg/cortex/runtime_config.go b/pkg/cortex/runtime_config.go index ea84fb3f525..e1471c33b7d 100644 --- a/pkg/cortex/runtime_config.go +++ b/pkg/cortex/runtime_config.go @@ -76,10 +76,13 @@ func (l runtimeConfigLoader) load(r io.Reader) (any, error) { // only check if target is `all`, `distributor`, "querier", and "ruler" // refer to https://github.com/cortexproject/cortex/issues/6741#issuecomment-3067244929 if overrides != nil { - for _, ul := range overrides.TenantLimits { + for userID, ul := range overrides.TenantLimits { if err := ul.Validate(l.cfg.NameValidationScheme, l.cfg.Distributor.ShardByAllLabels, l.cfg.Ingester.ActiveSeriesMetricsEnabled); err != nil { return nil, err } + if err := ul.ValidateQueryLimits(userID, l.cfg.BlocksStorage.TSDB.CloseIdleTSDBTimeout); err != nil { + return nil, err + } } } } diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 188e6df98df..f294215ded1 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -168,9 +168,6 @@ type Config struct { // this (and should never use it) but this feature is used by other projects built on top of it SkipLabelNameValidation bool `yaml:"-"` - // This config is dynamically injected because defined in the querier config. - ShuffleShardingLookbackPeriod time.Duration `yaml:"-"` - // ZoneResultsQuorumMetadata enables zone results quorum when querying ingester replication set // with metadata APIs (labels names and values for now). When zone awareness is enabled, only results // from quorum number of zones will be included to reduce data merged and improve performance. diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index ff3039f53ff..336490ebcb0 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -3262,7 +3262,7 @@ func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, [] if cfg.shuffleShardEnabled { distributorCfg.ShardingStrategy = util.ShardingStrategyShuffle - distributorCfg.ShuffleShardingLookbackPeriod = time.Hour + cfg.limits.ShuffleShardingIngestersLookbackPeriod = time.Hour cfg.limits.IngestionTenantShardSize = cfg.shuffleShardSize } @@ -4640,3 +4640,147 @@ func TestDistributor_BatchTimeoutMetric(t *testing.T) { cortex_distributor_ingester_push_timeouts_total 5 `), "cortex_distributor_ingester_push_timeouts_total")) } +func TestDistributor_ShuffleShardingIngestersLookbackPeriod(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + lookbackPeriod time.Duration + shardSize int + expectedBehavior string + }{ + "lookback disabled (0) should not use shuffle sharding with lookback": { + lookbackPeriod: 0, + shardSize: 3, + expectedBehavior: "no_lookback", + }, + "lookback 1h should include ingesters from past hour": { + lookbackPeriod: 1 * time.Hour, + shardSize: 3, + expectedBehavior: "with_lookback", + }, + "lookback 2h should include ingesters from past 2 hours": { + lookbackPeriod: 2 * time.Hour, + shardSize: 3, + expectedBehavior: "with_lookback", + }, + "shard size 0 should not use shuffle sharding": { + lookbackPeriod: 1 * time.Hour, + shardSize: 0, + expectedBehavior: "no_shuffle_sharding", + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + t.Parallel() + + // Setup distributor with shuffle sharding enabled + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.IngestionTenantShardSize = testData.shardSize + limits.ShuffleShardingIngestersLookbackPeriod = model.Duration(testData.lookbackPeriod) + + numIngesters := 10 + ds, _, _, _ := prepare(t, prepConfig{ + numIngesters: numIngesters, + happyIngesters: numIngesters, + numDistributors: 1, + shardByAllLabels: true, + shuffleShardSize: testData.shardSize, + shuffleShardEnabled: true, + limits: limits, + }) + + ctx := user.InjectOrgID(context.Background(), "test-user") + + // Get ingesters for query + replicationSet, err := ds[0].GetIngestersForQuery(ctx) + require.NoError(t, err) + + switch testData.expectedBehavior { + case "no_lookback": + // When lookback is disabled, should still use shuffle sharding but without lookback + // This means we get the current shard size + if testData.shardSize > 0 { + assert.LessOrEqual(t, len(replicationSet.Instances), testData.shardSize, + "should not exceed shard size when lookback is disabled") + } + + case "with_lookback": + // When lookback is enabled, should use shuffle sharding with lookback + // This means we might get more ingesters than the shard size + assert.GreaterOrEqual(t, len(replicationSet.Instances), testData.shardSize, + "should include at least shard size ingesters with lookback") + + case "no_shuffle_sharding": + // When shard size is 0, shuffle sharding is disabled + // Should query all ingesters + assert.Equal(t, numIngesters, len(replicationSet.Instances), + "should query all ingesters when shuffle sharding is disabled") + } + }) + } +} + +func TestDistributor_ShuffleShardingIngestersLookbackPeriod_Validation(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + queryStoreAfter time.Duration + shuffleShardingIngestersLookbackPeriod time.Duration + shouldBeValid bool + description string + }{ + "valid: lookback >= queryStoreAfter": { + queryStoreAfter: 1 * time.Hour, + shuffleShardingIngestersLookbackPeriod: 2 * time.Hour, + shouldBeValid: true, + description: "lookback period should be >= queryStoreAfter", + }, + "valid: lookback == queryStoreAfter": { + queryStoreAfter: 1 * time.Hour, + shuffleShardingIngestersLookbackPeriod: 1 * time.Hour, + shouldBeValid: true, + description: "lookback period can equal queryStoreAfter", + }, + "invalid: lookback < queryStoreAfter": { + queryStoreAfter: 2 * time.Hour, + shuffleShardingIngestersLookbackPeriod: 1 * time.Hour, + shouldBeValid: false, + description: "lookback period must be >= queryStoreAfter", + }, + "valid: both disabled": { + queryStoreAfter: 0, + shuffleShardingIngestersLookbackPeriod: 0, + shouldBeValid: true, + description: "both can be disabled", + }, + "valid: queryStoreAfter disabled": { + queryStoreAfter: 0, + shuffleShardingIngestersLookbackPeriod: 1 * time.Hour, + shouldBeValid: true, + description: "queryStoreAfter can be disabled while lookback is enabled", + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + t.Parallel() + + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.QueryStoreAfter = model.Duration(testData.queryStoreAfter) + limits.ShuffleShardingIngestersLookbackPeriod = model.Duration(testData.shuffleShardingIngestersLookbackPeriod) + + // ValidateQueryLimits requires userID and closeIdleTSDBTimeout + err := limits.ValidateQueryLimits("test-user", 13*time.Hour) + + if testData.shouldBeValid { + assert.NoError(t, err, testData.description) + } else { + assert.Error(t, err, testData.description) + assert.Contains(t, err.Error(), "shuffle_sharding_ingesters_lookback_period", testData.description) + } + }) + } +} diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 9835ab1c822..37cc51a15a5 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -92,7 +92,7 @@ func (d *Distributor) GetIngestersForQuery(ctx context.Context, matchers ...*lab // part of the tenant's subring. if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle { shardSize := d.limits.IngestionTenantShardSize(userID) - lookbackPeriod := d.cfg.ShuffleShardingLookbackPeriod + lookbackPeriod := d.limits.ShuffleShardingIngestersLookbackPeriod if shardSize > 0 && lookbackPeriod > 0 { return d.ingestersRing.ShuffleShardWithLookback(userID, shardSize, lookbackPeriod, time.Now()).GetReplicationSetForOperation(ring.Read) @@ -123,7 +123,7 @@ func (d *Distributor) GetIngestersForMetadata(ctx context.Context) (ring.Replica // part of the tenant's subring. if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle { shardSize := d.limits.IngestionTenantShardSize(userID) - lookbackPeriod := d.cfg.ShuffleShardingLookbackPeriod + lookbackPeriod := d.limits.ShuffleShardingIngestersLookbackPeriod if shardSize > 0 && lookbackPeriod > 0 { return d.ingestersRing.ShuffleShardWithLookback(userID, shardSize, lookbackPeriod, time.Now()).GetReplicationSetForOperation(ring.Read) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index d53dc35a573..d935ea0a9e6 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -143,9 +143,6 @@ type Config struct { DistributorShardingStrategy string `yaml:"-"` DistributorShardByAllLabels bool `yaml:"-"` - // Injected at runtime and read from querier config. - QueryIngestersWithin time.Duration `yaml:"-"` - DefaultLimits InstanceLimits `yaml:"instance_limits"` InstanceLimitsFn func() *InstanceLimits `yaml:"-"` @@ -1922,7 +1919,7 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu } defer db.releaseReadLock() - mint, maxt, err := metadataQueryRange(startTimestampMs, endTimestampMs, db, i.cfg.QueryIngestersWithin) + mint, maxt, err := metadataQueryRange(startTimestampMs, endTimestampMs, db, i.limits.QueryIngestersWithin(userID)) if err != nil { return nil, cleanup, err } @@ -2038,7 +2035,7 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR } defer db.releaseReadLock() - mint, maxt, err := metadataQueryRange(startTimestampMs, endTimestampMs, db, i.cfg.QueryIngestersWithin) + mint, maxt, err := metadataQueryRange(startTimestampMs, endTimestampMs, db, i.limits.QueryIngestersWithin(userID)) if err != nil { return nil, cleanup, err } @@ -2170,7 +2167,7 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien return cleanup, err } - mint, maxt, err := metadataQueryRange(req.StartTimestampMs, req.EndTimestampMs, db, i.cfg.QueryIngestersWithin) + mint, maxt, err := metadataQueryRange(req.StartTimestampMs, req.EndTimestampMs, db, i.limits.QueryIngestersWithin(userID)) if err != nil { return cleanup, err } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index eb509e4e352..dc81131a865 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -3712,13 +3712,17 @@ func Test_Ingester_MetricsForLabelMatchers(t *testing.T) { for testName, testData := range tests { t.Run(testName, func(t *testing.T) { + limits := defaultLimitsTestConfig() + limits.QueryIngestersWithin = model.Duration(testData.queryIngestersWithin) + tenantLimits := newMockTenantLimits(map[string]*validation.Limits{"test": &limits}) + i.limits = validation.NewOverrides(limits, tenantLimits) + req := &client.MetricsForLabelMatchersRequest{ StartTimestampMs: testData.from, EndTimestampMs: testData.to, MatchersSet: testData.matchers, Limit: testData.limit, } - i.cfg.QueryIngestersWithin = testData.queryIngestersWithin res, err := i.MetricsForLabelMatchers(ctx, req) require.NoError(t, err) assert.ElementsMatch(t, testData.expected, res.Metric) @@ -6327,12 +6331,15 @@ func TestExpendedPostingsCacheMatchers(t *testing.T) { cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour} cfg.BlocksStorageConfig.TSDB.PostingsCache.Blocks.Enabled = true cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled = true - cfg.QueryIngestersWithin = 24 * time.Hour + + limits := defaultLimitsTestConfig() + limits.QueryIngestersWithin = model.Duration(24 * time.Hour) + tenantLimits := newMockTenantLimits(map[string]*validation.Limits{userID: &limits}) ctx := user.InjectOrgID(context.Background(), userID) r := prometheus.NewRegistry() - ing, err := prepareIngesterWithBlocksStorage(t, cfg, r) + ing, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, tenantLimits, "", r) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index b3e336a940a..b755f643a2c 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -104,6 +104,7 @@ type BlocksStoreLimits interface { MaxChunksPerQueryFromStore(userID string) int StoreGatewayTenantShardSize(userID string) float64 + QueryStoreAfter(userID string) time.Duration } type blocksStoreQueryableMetrics struct { @@ -133,13 +134,12 @@ func newBlocksStoreQueryableMetrics(reg prometheus.Registerer) *blocksStoreQuery type BlocksStoreQueryable struct { services.Service - stores BlocksStoreSet - finder BlocksFinder - consistency *BlocksConsistencyChecker - logger log.Logger - queryStoreAfter time.Duration - metrics *blocksStoreQueryableMetrics - limits BlocksStoreLimits + stores BlocksStoreSet + finder BlocksFinder + consistency *BlocksConsistencyChecker + logger log.Logger + metrics *blocksStoreQueryableMetrics + limits BlocksStoreLimits storeGatewayQueryStatsEnabled bool storeGatewayConsistencyCheckMaxAttempts int @@ -168,7 +168,6 @@ func NewBlocksStoreQueryable( stores: stores, finder: finder, consistency: consistency, - queryStoreAfter: config.QueryStoreAfter, logger: logger, subservices: manager, subservicesWatcher: services.NewFailureWatcher(), @@ -305,7 +304,6 @@ func (q *BlocksStoreQueryable) Querier(mint, maxt int64) (storage.Querier, error limits: q.limits, consistency: q.consistency, logger: q.logger, - queryStoreAfter: q.queryStoreAfter, storeGatewayQueryStatsEnabled: q.storeGatewayQueryStatsEnabled, storeGatewayConsistencyCheckMaxAttempts: q.storeGatewayConsistencyCheckMaxAttempts, storeGatewaySeriesBatchSize: q.storeGatewaySeriesBatchSize, @@ -321,10 +319,6 @@ type blocksStoreQuerier struct { limits BlocksStoreLimits logger log.Logger - // If set, the querier manipulates the max time to not be greater than - // "now - queryStoreAfter" so that most recent blocks are not queried. - queryStoreAfter time.Duration - // If enabled, query stats of store gateway requests will be logged // using `info` level. storeGatewayQueryStatsEnabled bool @@ -492,14 +486,15 @@ func (q *blocksStoreQuerier) selectSorted(ctx context.Context, sp *storage.Selec func (q *blocksStoreQuerier) queryWithConsistencyCheck(ctx context.Context, logger log.Logger, minT, maxT int64, matchers []*labels.Matcher, userID string, queryFunc func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error, error)) error { + queryStoreAfter := q.limits.QueryStoreAfter(userID) // If queryStoreAfter is enabled, we do manipulate the query maxt to query samples up until // now - queryStoreAfter, because the most recent time range is covered by ingesters. This // optimization is particularly important for the blocks storage because can be used to skip // querying most recent not-compacted-yet blocks from the storage. - if q.queryStoreAfter > 0 { + if queryStoreAfter > 0 { now := time.Now() origMaxT := maxT - maxT = min(maxT, util.TimeToMillis(now.Add(-q.queryStoreAfter))) + maxT = min(maxT, util.TimeToMillis(now.Add(-queryStoreAfter))) if origMaxT != maxT { level.Debug(logger).Log("msg", "the max time of the query to blocks storage has been manipulated", "original", origMaxT, "updated", maxT) diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 26a2c2fb4ac..2d910ae06c9 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -50,6 +50,23 @@ import ( "github.com/cortexproject/cortex/pkg/util/validation" ) +type mockTenantLimits struct { + limits map[string]*validation.Limits +} + +func newMockTenantLimits(limits map[string]*validation.Limits) *mockTenantLimits { + return &mockTenantLimits{ + limits: limits, + } +} + +func (l *mockTenantLimits) ByUserID(userID string) *validation.Limits { + return l.limits[userID] +} + +func (l *mockTenantLimits) AllByUserID() map[string]*validation.Limits { + return l.limits +} func TestBlocksStoreQuerier_Select(t *testing.T) { t.Parallel() @@ -2424,15 +2441,14 @@ func TestBlocksStoreQuerier_SelectSortedShouldHonorQueryStoreAfter(t *testing.T) finder.On("GetBlocks", mock.Anything, "user-1", mock.Anything, mock.Anything, mock.Anything).Return(bucketindex.Blocks(nil), map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), error(nil)) q := &blocksStoreQuerier{ - minT: testData.queryMinT, - maxT: testData.queryMaxT, - finder: finder, - stores: &blocksStoreSetMock{}, - consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil), - logger: log.NewNopLogger(), - metrics: newBlocksStoreQueryableMetrics(nil), - limits: &blocksStoreLimitsMock{}, - queryStoreAfter: testData.queryStoreAfter, + minT: testData.queryMinT, + maxT: testData.queryMaxT, + finder: finder, + stores: &blocksStoreSetMock{}, + consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil), + logger: log.NewNopLogger(), + metrics: newBlocksStoreQueryableMetrics(nil), + limits: &blocksStoreLimitsMock{}, } sp := &storage.SelectHints{ @@ -2550,7 +2566,6 @@ func TestBlocksStoreQuerier_PromQLExecution(t *testing.T) { // Instance the querier that will be executed to run the query. cfg := Config{ - QueryStoreAfter: 0, StoreGatewayQueryStatsEnabled: false, StoreGatewayConsistencyCheckMaxAttempts: 3, } @@ -2718,6 +2733,7 @@ func (m *storeGatewaySeriesClientMock) Recv() (*storepb.SeriesResponse, error) { type blocksStoreLimitsMock struct { maxChunksPerQuery int storeGatewayTenantShardSize float64 + queryStoreAfter time.Duration } func (m *blocksStoreLimitsMock) MaxChunksPerQueryFromStore(_ string) int { @@ -2728,6 +2744,10 @@ func (m *blocksStoreLimitsMock) StoreGatewayTenantShardSize(_ string) float64 { return m.storeGatewayTenantShardSize } +func (m *blocksStoreLimitsMock) QueryStoreAfter(_ string) time.Duration { + return m.queryStoreAfter +} + func (m *blocksStoreLimitsMock) S3SSEType(_ string) string { return "" } @@ -2989,3 +3009,199 @@ func createAggrChunk(t *testing.T, step time.Duration, from model.Time, points i }, } } + +func TestBlocksStoreQuerier_MultiTenantQueryStoreAfter(t *testing.T) { + t.Parallel() + + now := time.Now() + + tests := map[string]struct { + queryStoreAfter time.Duration + queryMinT int64 + queryMaxT int64 + expectedMinT int64 + expectedMaxT int64 + description string + }{ + "30m cutoff: should manipulate recent query": { + queryStoreAfter: 30 * time.Minute, + queryMinT: util.TimeToMillis(now.Add(-2 * time.Hour)), + queryMaxT: util.TimeToMillis(now), + expectedMinT: util.TimeToMillis(now.Add(-2 * time.Hour)), + expectedMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)), + description: "tenant with 30m cutoff should query blocks up to 30m ago", + }, + "2h cutoff: should manipulate recent query": { + queryStoreAfter: 2 * time.Hour, + queryMinT: util.TimeToMillis(now.Add(-5 * time.Hour)), + queryMaxT: util.TimeToMillis(now), + expectedMinT: util.TimeToMillis(now.Add(-5 * time.Hour)), + expectedMaxT: util.TimeToMillis(now.Add(-2 * time.Hour)), + description: "tenant with 2h cutoff should query blocks up to 2h ago", + }, + "disabled: should not manipulate time range": { + queryStoreAfter: 0, + queryMinT: util.TimeToMillis(now.Add(-5 * time.Hour)), + queryMaxT: util.TimeToMillis(now), + expectedMinT: util.TimeToMillis(now.Add(-5 * time.Hour)), + expectedMaxT: util.TimeToMillis(now), + description: "disabled queryStoreAfter should not manipulate time range", + }, + "1h cutoff: query already old should not be manipulated": { + queryStoreAfter: 1 * time.Hour, + queryMinT: util.TimeToMillis(now.Add(-3 * time.Hour)), + queryMaxT: util.TimeToMillis(now.Add(-2 * time.Hour)), + expectedMinT: util.TimeToMillis(now.Add(-3 * time.Hour)), + expectedMaxT: util.TimeToMillis(now.Add(-2 * time.Hour)), + description: "query already older than cutoff should not be manipulated", + }, + "2h cutoff: recent query should be skipped": { + queryStoreAfter: 2 * time.Hour, + queryMinT: util.TimeToMillis(now.Add(-1 * time.Hour)), + queryMaxT: util.TimeToMillis(now), + expectedMinT: 0, + expectedMaxT: 0, + description: "query entirely within cutoff period should be skipped", + }, + "1h cutoff: partial overlap should manipulate": { + queryStoreAfter: 1 * time.Hour, + queryMinT: util.TimeToMillis(now.Add(-90 * time.Minute)), + queryMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)), + expectedMinT: util.TimeToMillis(now.Add(-90 * time.Minute)), + expectedMaxT: util.TimeToMillis(now.Add(-60 * time.Minute)), + description: "query partially overlapping cutoff should be manipulated", + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + t.Parallel() + + ctx := user.InjectOrgID(context.Background(), "test-tenant") + finder := &blocksFinderMock{} + finder.On("GetBlocks", mock.Anything, "test-tenant", mock.Anything, mock.Anything, mock.Anything).Return(bucketindex.Blocks(nil), map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), error(nil)) + + q := &blocksStoreQuerier{ + minT: testData.queryMinT, + maxT: testData.queryMaxT, + finder: finder, + stores: &blocksStoreSetMock{}, + consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil), + logger: log.NewNopLogger(), + metrics: newBlocksStoreQueryableMetrics(nil), + limits: &blocksStoreLimitsMock{queryStoreAfter: testData.queryStoreAfter}, + } + + sp := &storage.SelectHints{ + Start: testData.queryMinT, + End: testData.queryMaxT, + } + + set := q.selectSorted(ctx, sp) + require.NoError(t, set.Err()) + + if testData.expectedMinT == 0 && testData.expectedMaxT == 0 { + assert.Len(t, finder.Calls, 0, testData.description) + } else { + require.Len(t, finder.Calls, 1, testData.description) + assert.Equal(t, testData.expectedMinT, finder.Calls[0].Arguments.Get(2), testData.description) + assert.InDelta(t, testData.expectedMaxT, finder.Calls[0].Arguments.Get(3), float64(5*time.Second.Milliseconds()), testData.description) + } + }) + } +} + +func TestBlocksStoreQuerier_QueryStoreAfterBoundary(t *testing.T) { + t.Parallel() + + now := time.Now() + cutoff := 1 * time.Hour + + tests := map[string]struct { + queryMinT int64 + queryMaxT int64 + expectedMinT int64 + expectedMaxT int64 + shouldSkip bool + description string + }{ + "maxT exactly at cutoff boundary": { + queryMinT: util.TimeToMillis(now.Add(-2 * cutoff)), + queryMaxT: util.TimeToMillis(now.Add(-cutoff)), + expectedMinT: util.TimeToMillis(now.Add(-2 * cutoff)), + expectedMaxT: util.TimeToMillis(now.Add(-cutoff)), + shouldSkip: false, + description: "should not manipulate when maxT is exactly at boundary", + }, + "maxT 1ms before cutoff boundary": { + queryMinT: util.TimeToMillis(now.Add(-2 * cutoff)), + queryMaxT: util.TimeToMillis(now.Add(-cutoff - time.Millisecond)), + expectedMinT: util.TimeToMillis(now.Add(-2 * cutoff)), + expectedMaxT: util.TimeToMillis(now.Add(-cutoff - time.Millisecond)), + shouldSkip: false, + description: "should not manipulate when maxT is before boundary", + }, + "maxT 1ms after cutoff boundary": { + queryMinT: util.TimeToMillis(now.Add(-2 * cutoff)), + queryMaxT: util.TimeToMillis(now.Add(-cutoff + time.Millisecond)), + expectedMinT: util.TimeToMillis(now.Add(-2 * cutoff)), + expectedMaxT: util.TimeToMillis(now.Add(-cutoff)), + shouldSkip: false, + description: "should manipulate when maxT is 1ms after boundary", + }, + "minT 1ms before cutoff boundary": { + queryMinT: util.TimeToMillis(now.Add(-cutoff - time.Millisecond)), + queryMaxT: util.TimeToMillis(now), + expectedMinT: util.TimeToMillis(now.Add(-cutoff - time.Millisecond)), + expectedMaxT: util.TimeToMillis(now.Add(-cutoff)), + shouldSkip: false, + description: "should manipulate when minT is before boundary", + }, + "minT well after cutoff boundary": { + queryMinT: util.TimeToMillis(now.Add(-30 * time.Minute)), + queryMaxT: util.TimeToMillis(now), + expectedMinT: 0, + expectedMaxT: 0, + shouldSkip: true, + description: "should skip query when minT is well after boundary", + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + t.Parallel() + + ctx := user.InjectOrgID(context.Background(), "test") + finder := &blocksFinderMock{} + finder.On("GetBlocks", mock.Anything, "test", mock.Anything, mock.Anything, mock.Anything).Return(bucketindex.Blocks(nil), map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), error(nil)) + + q := &blocksStoreQuerier{ + minT: testData.queryMinT, + maxT: testData.queryMaxT, + finder: finder, + stores: &blocksStoreSetMock{}, + consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil), + logger: log.NewNopLogger(), + metrics: newBlocksStoreQueryableMetrics(nil), + limits: &blocksStoreLimitsMock{queryStoreAfter: cutoff}, + } + + sp := &storage.SelectHints{ + Start: testData.queryMinT, + End: testData.queryMaxT, + } + + set := q.selectSorted(ctx, sp) + require.NoError(t, set.Err()) + + if testData.shouldSkip { + assert.Len(t, finder.Calls, 0, testData.description) + } else { + require.Len(t, finder.Calls, 1, testData.description) + assert.Equal(t, testData.expectedMinT, finder.Calls[0].Arguments.Get(2), testData.description) + // Use InDelta for maxT since the implementation uses time.Now() which may differ slightly + assert.InDelta(t, testData.expectedMaxT, finder.Calls[0].Arguments.Get(3), float64(5*time.Second.Milliseconds()), testData.description) + } + }) + } +} diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go index f57f32bda55..32f728f743a 100644 --- a/pkg/querier/distributor_queryable.go +++ b/pkg/querier/distributor_queryable.go @@ -23,6 +23,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/chunkcompat" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/cortexproject/cortex/pkg/util/users" + "github.com/cortexproject/cortex/pkg/util/validation" ) const retryMinBackoff = time.Millisecond @@ -42,15 +43,15 @@ type Distributor interface { MetricsMetadata(ctx context.Context, req *client.MetricsMetadataRequest) ([]scrape.MetricMetadata, error) } -func newDistributorQueryable(distributor Distributor, streamingMetdata bool, labelNamesWithMatchers bool, iteratorFn chunkIteratorFunc, queryIngestersWithin time.Duration, isPartialDataEnabled partialdata.IsCfgEnabledFunc, ingesterQueryMaxAttempts int) QueryableWithFilter { +func newDistributorQueryable(distributor Distributor, streamingMetdata bool, labelNamesWithMatchers bool, iteratorFn chunkIteratorFunc, isPartialDataEnabled partialdata.IsCfgEnabledFunc, ingesterQueryMaxAttempts int, limits *validation.Overrides) QueryableWithFilter { return distributorQueryable{ distributor: distributor, streamingMetdata: streamingMetdata, labelNamesWithMatchers: labelNamesWithMatchers, iteratorFn: iteratorFn, - queryIngestersWithin: queryIngestersWithin, isPartialDataEnabled: isPartialDataEnabled, ingesterQueryMaxAttempts: ingesterQueryMaxAttempts, + limits: limits, } } @@ -59,9 +60,9 @@ type distributorQueryable struct { streamingMetdata bool labelNamesWithMatchers bool iteratorFn chunkIteratorFunc - queryIngestersWithin time.Duration isPartialDataEnabled partialdata.IsCfgEnabledFunc ingesterQueryMaxAttempts int + limits *validation.Overrides } func (d distributorQueryable) Querier(mint, maxt int64) (storage.Querier, error) { @@ -72,15 +73,15 @@ func (d distributorQueryable) Querier(mint, maxt int64) (storage.Querier, error) streamingMetadata: d.streamingMetdata, labelNamesMatchers: d.labelNamesWithMatchers, chunkIterFn: d.iteratorFn, - queryIngestersWithin: d.queryIngestersWithin, isPartialDataEnabled: d.isPartialDataEnabled, ingesterQueryMaxAttempts: d.ingesterQueryMaxAttempts, + limits: d.limits, }, nil } - -func (d distributorQueryable) UseQueryable(now time.Time, _, queryMaxT int64) bool { +func (d distributorQueryable) UseQueryable(now time.Time, userID string, _, queryMaxT int64) bool { // Include ingester only if maxt is within QueryIngestersWithin w.r.t. current time. - return d.queryIngestersWithin == 0 || queryMaxT >= util.TimeToMillis(now.Add(-d.queryIngestersWithin)) + queryIngestersWithin := d.limits.QueryIngestersWithin(userID) + return queryIngestersWithin == 0 || queryMaxT >= util.TimeToMillis(now.Add(-queryIngestersWithin)) } type distributorQuerier struct { @@ -89,9 +90,9 @@ type distributorQuerier struct { streamingMetadata bool labelNamesMatchers bool chunkIterFn chunkIteratorFunc - queryIngestersWithin time.Duration isPartialDataEnabled partialdata.IsCfgEnabledFunc ingesterQueryMaxAttempts int + limits *validation.Overrides } // Select implements storage.Querier interface. @@ -104,15 +105,20 @@ func (q *distributorQuerier) Select(ctx context.Context, sortSeries bool, sp *st if sp != nil { minT, maxT = sp.Start, sp.End } + userID, err := users.TenantID(ctx) + if err != nil { + return storage.ErrSeriesSet(err) + } + queryIngestersWithin := q.limits.QueryIngestersWithin(userID) // We should manipulate the query mint to query samples up until // now - queryIngestersWithin, because older time ranges are covered by the storage. This // optimization is particularly important for the blocks storage where the blocks retention in the // ingesters could be way higher than queryIngestersWithin. - if q.queryIngestersWithin > 0 { + if queryIngestersWithin > 0 { now := time.Now() origMinT := minT - minT = max(minT, util.TimeToMillis(now.Add(-q.queryIngestersWithin))) + minT = max(minT, util.TimeToMillis(now.Add(-queryIngestersWithin))) if origMinT != minT { level.Debug(log).Log("msg", "the min time of the query to ingesters has been manipulated", "original", origMinT, "updated", minT) diff --git a/pkg/querier/distributor_queryable_test.go b/pkg/querier/distributor_queryable_test.go index 825da08860f..e7632816f7d 100644 --- a/pkg/querier/distributor_queryable_test.go +++ b/pkg/querier/distributor_queryable_test.go @@ -91,13 +91,15 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T) distributor.On("MetricsForLabelMatchersStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]labels.Labels{}, nil) ctx := user.InjectOrgID(context.Background(), "test") - queryable := newDistributorQueryable(distributor, streamingMetadataEnabled, true, nil, testData.queryIngestersWithin, nil, 1) - querier, err := queryable.Querier(testData.queryMinT, testData.queryMaxT) - require.NoError(t, err) limits := DefaultLimitsConfig() + limits.QueryIngestersWithin = model.Duration(testData.queryIngestersWithin) overrides := validation.NewOverrides(limits, nil) + queryable := newDistributorQueryable(distributor, streamingMetadataEnabled, true, nil, nil, 1, overrides) + querier, err := queryable.Querier(testData.queryMinT, testData.queryMaxT) + require.NoError(t, err) + start, end, err := validateQueryTimeRange(ctx, "test", testData.queryMinT, testData.queryMaxT, overrides, 0) require.NoError(t, err) // Select hints are passed by Prometheus when querying /series. @@ -129,18 +131,23 @@ func TestDistributorQueryableFilter(t *testing.T) { t.Parallel() d := &MockDistributor{} - dq := newDistributorQueryable(d, false, true, nil, 1*time.Hour, nil, 1) + + limits := DefaultLimitsConfig() + limits.QueryIngestersWithin = model.Duration(1 * time.Hour) + overrides := validation.NewOverrides(limits, nil) + + dq := newDistributorQueryable(d, false, true, nil, nil, 1, overrides) now := time.Now() queryMinT := util.TimeToMillis(now.Add(-5 * time.Minute)) queryMaxT := util.TimeToMillis(now) - require.True(t, dq.UseQueryable(now, queryMinT, queryMaxT)) - require.True(t, dq.UseQueryable(now.Add(time.Hour), queryMinT, queryMaxT)) + require.True(t, dq.UseQueryable(now, "test", queryMinT, queryMaxT)) + require.True(t, dq.UseQueryable(now.Add(time.Hour), "test", queryMinT, queryMaxT)) // Same query, hour+1ms later, is not sent to ingesters. - require.False(t, dq.UseQueryable(now.Add(time.Hour).Add(1*time.Millisecond), queryMinT, queryMaxT)) + require.False(t, dq.UseQueryable(now.Add(time.Hour).Add(1*time.Millisecond), "test", queryMinT, queryMaxT)) } func TestIngesterStreaming(t *testing.T) { @@ -179,9 +186,13 @@ func TestIngesterStreaming(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "0") - queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, 0, func(string) bool { + limits := DefaultLimitsConfig() + limits.QueryIngestersWithin = model.Duration(0) // Disable time filtering for this test + overrides := validation.NewOverrides(limits, nil) + + queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, func(string) bool { return partialDataEnabled - }, 1) + }, 1, overrides) querier, err := queryable.Querier(mint, maxt) require.NoError(t, err) @@ -345,9 +356,14 @@ func TestDistributorQuerier_Retry(t *testing.T) { } ingesterQueryMaxAttempts := 3 - queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, 0, func(string) bool { + + limits := DefaultLimitsConfig() + limits.QueryIngestersWithin = model.Duration(0) + overrides := validation.NewOverrides(limits, nil) + + queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, func(string) bool { return true - }, ingesterQueryMaxAttempts) + }, ingesterQueryMaxAttempts, overrides) querier, err := queryable.Querier(mint, maxt) require.NoError(t, err) @@ -424,9 +440,12 @@ func TestDistributorQuerier_LabelNames(t *testing.T) { Return(metrics, partialDataErr) } - queryable := newDistributorQueryable(d, streamingEnabled, labelNamesWithMatchers, nil, 0, func(string) bool { + limits := DefaultLimitsConfig() + overrides := validation.NewOverrides(limits, nil) + + queryable := newDistributorQueryable(d, streamingEnabled, labelNamesWithMatchers, nil, func(string) bool { return partialDataEnabled - }, 1) + }, 1, overrides) querier, err := queryable.Querier(mint, maxt) require.NoError(t, err) @@ -444,3 +463,90 @@ func TestDistributorQuerier_LabelNames(t *testing.T) { } } } +func TestDistributorQuerier_QueryIngestersWithinBoundary(t *testing.T) { + t.Parallel() + + now := time.Now() + lookback := 1 * time.Hour + + tests := map[string]struct { + queryMinT int64 + queryMaxT int64 + expectedMinT int64 + expectedMaxT int64 + description string + }{ + "query exactly at lookback boundary": { + queryMinT: util.TimeToMillis(now.Add(-lookback)), + queryMaxT: util.TimeToMillis(now), + expectedMinT: util.TimeToMillis(now.Add(-lookback)), + expectedMaxT: util.TimeToMillis(now), + description: "should not manipulate when minT is exactly at boundary", + }, + "query 1ms before lookback boundary": { + queryMinT: util.TimeToMillis(now.Add(-lookback - time.Millisecond)), + queryMaxT: util.TimeToMillis(now), + expectedMinT: util.TimeToMillis(now.Add(-lookback)), + expectedMaxT: util.TimeToMillis(now), + description: "should manipulate when minT is 1ms before boundary", + }, + "query 1ms after lookback boundary": { + queryMinT: util.TimeToMillis(now.Add(-lookback + time.Millisecond)), + queryMaxT: util.TimeToMillis(now), + expectedMinT: util.TimeToMillis(now.Add(-lookback + time.Millisecond)), + expectedMaxT: util.TimeToMillis(now), + description: "should not manipulate when minT is 1ms after boundary", + }, + "maxT well before lookback boundary": { + queryMinT: util.TimeToMillis(now.Add(-2 * lookback)), + queryMaxT: util.TimeToMillis(now.Add(-lookback - 10*time.Second)), + expectedMinT: 0, + expectedMaxT: 0, + description: "should skip query when maxT is well before boundary", + }, + "maxT 1ms before lookback boundary": { + queryMinT: util.TimeToMillis(now.Add(-2 * lookback)), + queryMaxT: util.TimeToMillis(now.Add(-lookback - time.Millisecond)), + expectedMinT: 0, + expectedMaxT: 0, + description: "should skip query when maxT is before boundary", + }, + "maxT well after lookback boundary": { + queryMinT: util.TimeToMillis(now.Add(-2 * lookback)), + queryMaxT: util.TimeToMillis(now.Add(-lookback + 10*time.Second)), + expectedMinT: util.TimeToMillis(now.Add(-lookback)), + expectedMaxT: util.TimeToMillis(now.Add(-lookback + 10*time.Second)), + description: "should manipulate when maxT is well after boundary", + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + t.Parallel() + + distributor := &MockDistributor{} + distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil) + + ctx := user.InjectOrgID(context.Background(), "test") + + limits := DefaultLimitsConfig() + limits.QueryIngestersWithin = model.Duration(lookback) + overrides := validation.NewOverrides(limits, nil) + + queryable := newDistributorQueryable(distributor, false, true, nil, nil, 1, overrides) + querier, err := queryable.Querier(testData.queryMinT, testData.queryMaxT) + require.NoError(t, err) + + seriesSet := querier.Select(ctx, true, nil) + require.NoError(t, seriesSet.Err()) + + if testData.expectedMinT == 0 && testData.expectedMaxT == 0 { + assert.Len(t, distributor.Calls, 0, testData.description) + } else { + require.Len(t, distributor.Calls, 1, testData.description) + assert.InDelta(t, testData.expectedMinT, int64(distributor.Calls[0].Arguments.Get(1).(model.Time)), float64(5*time.Second.Milliseconds()), testData.description) + assert.Equal(t, testData.expectedMaxT, int64(distributor.Calls[0].Arguments.Get(2).(model.Time)), testData.description) + } + }) + } +} diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index a8f20560443..559f1f1c533 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -98,7 +98,6 @@ type parquetQueryableWithFallback struct { services.Service fallbackDisabled bool - queryStoreAfter time.Duration parquetQueryable storage.Queryable cache parquetutil.CacheInterface[parquet_storage.ParquetShard] blockStorageQueryable *BlocksStoreQueryable @@ -280,7 +279,6 @@ func NewParquetQueryable( blockStorageQueryable: blockStorageQueryable, parquetQueryable: parquetQueryable, cache: cache, - queryStoreAfter: config.QueryStoreAfter, subservicesWatcher: services.NewFailureWatcher(), finder: blockStorageQueryable.finder, metrics: newParquetQueryableFallbackMetrics(reg), @@ -338,7 +336,6 @@ func (p *parquetQueryableWithFallback) Querier(mint, maxt int64) (storage.Querie minT: mint, maxT: maxt, parquetQuerier: pq, - queryStoreAfter: p.queryStoreAfter, blocksStoreQuerier: bsq, finder: p.finder, metrics: p.metrics, @@ -358,10 +355,6 @@ type parquetQuerierWithFallback struct { parquetQuerier storage.Querier blocksStoreQuerier storage.Querier - // If set, the querier manipulates the max time to not be greater than - // "now - queryStoreAfter" so that most recent blocks are not queried. - queryStoreAfter time.Duration - // metrics metrics *parquetQueryableFallbackMetrics @@ -503,7 +496,7 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool mint, maxt, limit = hints.Start, hints.End, hints.Limit } - maxt = q.adjustMaxT(maxt) + maxt = q.adjustMaxT(ctx, maxt) hints.End = maxt if maxt < mint { @@ -578,14 +571,19 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool return storage.NewMergeSeriesSet(seriesSets, limit, storage.ChainedSeriesMerge) } -func (q *parquetQuerierWithFallback) adjustMaxT(maxt int64) int64 { +func (q *parquetQuerierWithFallback) adjustMaxT(ctx context.Context, maxt int64) int64 { // If queryStoreAfter is enabled, we do manipulate the query maxt to query samples up until // now - queryStoreAfter, because the most recent time range is covered by ingesters. This // optimization is particularly important for the blocks storage because can be used to skip // querying most recent not-compacted-yet blocks from the storage. - if q.queryStoreAfter > 0 { + userID, err := users.TenantID(ctx) + if err != nil { + return maxt + } + queryStoreAfter := q.limits.QueryStoreAfter(userID) + if queryStoreAfter > 0 { now := time.Now() - maxt = min(maxt, util.TimeToMillis(now.Add(-q.queryStoreAfter))) + maxt = min(maxt, util.TimeToMillis(now.Add(-queryStoreAfter))) } return maxt } @@ -603,7 +601,7 @@ func (q *parquetQuerierWithFallback) getBlocks(ctx context.Context, minT, maxT i return nil, nil, err } - maxT = q.adjustMaxT(maxT) + maxT = q.adjustMaxT(ctx, maxT) if maxT < minT { return nil, nil, nil diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index f3d6897b9d9..417bfdc433c 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus-community/parquet-common/convert" "github.com/prometheus-community/parquet-common/schema" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" @@ -104,9 +105,8 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { finder: finder, blocksStoreQuerier: q, parquetQuerier: mParquetQuerier, - queryStoreAfter: time.Hour, metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()), - limits: defaultOverrides(t, 0), + limits: defaultOverridesWithQueryStoreAfter(t, 0, time.Hour), logger: log.NewNopLogger(), defaultBlockStoreType: parquetBlockStore, } @@ -234,9 +234,8 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { finder: finder, blocksStoreQuerier: q, parquetQuerier: mParquetQuerier, - queryStoreAfter: queryStoreAfter, metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()), - limits: defaultOverrides(t, 0), + limits: defaultOverridesWithQueryStoreAfter(t, 0, queryStoreAfter), logger: log.NewNopLogger(), defaultBlockStoreType: parquetBlockStore, } @@ -399,7 +398,6 @@ func TestParquetQueryable_Limits(t *testing.T) { bkt, tempDir := cortex_testutil.PrepareFilesystemBucket(t) config := Config{ - QueryStoreAfter: 0, StoreGatewayQueryStatsEnabled: false, StoreGatewayConsistencyCheckMaxAttempts: 3, ParquetShardCache: parquetutil.CacheConfig{ @@ -590,9 +588,14 @@ func convertBlockToParquet(t *testing.T, ctx context.Context, userBucketClient o } func defaultOverrides(t *testing.T, queryVerticalShardSize int) *validation.Overrides { + return defaultOverridesWithQueryStoreAfter(t, queryVerticalShardSize, 0) +} + +func defaultOverridesWithQueryStoreAfter(t *testing.T, queryVerticalShardSize int, queryStoreAfter time.Duration) *validation.Overrides { limits := validation.Limits{} flagext.DefaultValues(&limits) limits.QueryVerticalShardSize = queryVerticalShardSize + limits.QueryStoreAfter = model.Duration(queryStoreAfter) overrides := validation.NewOverrides(limits, nil) return overrides @@ -815,9 +818,8 @@ func TestSelectProjectionHints(t *testing.T) { finder: finder, blocksStoreQuerier: mockTSDBQuerier, parquetQuerier: mockParquetQuerierInstance, - queryStoreAfter: 0, // Disable queryStoreAfter manipulation metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()), - limits: defaultOverrides(t, 0), + limits: defaultOverridesWithQueryStoreAfter(t, 0, 0), // Disable queryStoreAfter manipulation logger: log.NewNopLogger(), defaultBlockStoreType: parquetBlockStore, fallbackDisabled: false, @@ -1082,14 +1084,14 @@ func TestParquetQueryableFallbackDisabled(t *testing.T) { mParquetQuerier := &mockParquetQuerier{} pq := &parquetQuerierWithFallback{ - minT: minT, - maxT: maxT, - finder: finder, - blocksStoreQuerier: q, - parquetQuerier: mParquetQuerier, - queryStoreAfter: time.Hour, + minT: minT, + maxT: maxT, + finder: finder, + blocksStoreQuerier: q, + parquetQuerier: mParquetQuerier, + metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()), - limits: defaultOverrides(t, 0), + limits: defaultOverridesWithQueryStoreAfter(t, 0, time.Hour), logger: log.NewNopLogger(), defaultBlockStoreType: parquetBlockStore, fallbackDisabled: true, // Disable fallback @@ -1146,7 +1148,6 @@ func TestParquetQueryableFallbackDisabled(t *testing.T) { finder: finder, blocksStoreQuerier: q, parquetQuerier: mParquetQuerier, - queryStoreAfter: time.Hour, metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()), limits: defaultOverrides(t, 0), logger: log.NewNopLogger(), diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 720e304dfcb..2d42f9f152f 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -46,14 +46,11 @@ type Config struct { IngesterMetadataStreaming bool `yaml:"ingester_metadata_streaming"` IngesterLabelNamesWithMatchers bool `yaml:"ingester_label_names_with_matchers"` MaxSamples int `yaml:"max_samples"` - QueryIngestersWithin time.Duration `yaml:"query_ingesters_within"` EnablePerStepStats bool `yaml:"per_step_stats_enabled"` // Use compression for metrics query API or instant and range query APIs. ResponseCompression string `yaml:"response_compression"` - // QueryStoreAfter the time after which queries should also be sent to the store and not just ingesters. - QueryStoreAfter time.Duration `yaml:"query_store_after"` MaxQueryIntoFuture time.Duration `yaml:"max_query_into_future"` // The default evaluation interval for the promql engine. @@ -87,8 +84,6 @@ type Config struct { // The maximum number of times we attempt fetching data from Ingesters. IngesterQueryMaxAttempts int `yaml:"ingester_query_max_attempts"` - ShuffleShardingIngestersLookbackPeriod time.Duration `yaml:"shuffle_sharding_ingesters_lookback_period"` - ThanosEngine engine.ThanosEngineConfig `yaml:"thanos_engine"` // Ignore max query length check at Querier. @@ -136,12 +131,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.IngesterMetadataStreaming, "querier.ingester-metadata-streaming", true, "Deprecated (This feature will be always on after v1.18): Use streaming RPCs for metadata APIs from ingester.") f.BoolVar(&cfg.IngesterLabelNamesWithMatchers, "querier.ingester-label-names-with-matchers", false, "Use LabelNames ingester RPCs with match params.") f.IntVar(&cfg.MaxSamples, "querier.max-samples", 50e6, "Maximum number of samples a single query can load into memory.") - f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.") f.BoolVar(&cfg.EnablePerStepStats, "querier.per-step-stats-enabled", false, "Enable returning samples stats per steps in query response.") f.StringVar(&cfg.ResponseCompression, "querier.response-compression", "gzip", "Use compression for metrics query API or instant and range query APIs. Supported compression 'gzip', 'snappy', 'zstd' and '' (disable compression)") f.DurationVar(&cfg.MaxQueryIntoFuture, "querier.max-query-into-future", 10*time.Minute, "Maximum duration into the future you can query. 0 to disable.") f.DurationVar(&cfg.DefaultEvaluationInterval, "querier.default-evaluation-interval", time.Minute, "The default evaluation interval or step size for subqueries.") - f.DurationVar(&cfg.QueryStoreAfter, "querier.query-store-after", 0, "The time after which a metric should be queried from storage and not just ingesters. 0 means all queries are sent to store. When running the blocks storage, if this option is enabled, the time range of the query sent to the store will be manipulated to ensure the query end is not more recent than 'now - query-store-after'.") f.StringVar(&cfg.ActiveQueryTrackerDir, "querier.active-query-tracker-dir", "./active-query-tracker", "Active query tracker monitors active queries, and writes them to the file in given directory. If Cortex discovers any queries in this log during startup, it will log them to the log file. Setting to empty value disables active query tracker, which also disables -querier.max-concurrent option.") f.StringVar(&cfg.StoreGatewayAddresses, "querier.store-gateway-addresses", "", "Comma separated list of store-gateway addresses in DNS Service Discovery format. This option should be set when using the blocks storage and the store-gateway sharding is disabled (when enabled, the store-gateway instances form a ring and addresses are picked from the ring).") f.BoolVar(&cfg.StoreGatewayQueryStatsEnabled, "querier.store-gateway-query-stats-enabled", true, "If enabled, store gateway query stats will be logged using `info` log level.") @@ -149,7 +142,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.Int64Var(&cfg.StoreGatewaySeriesBatchSize, "querier.store-gateway-series-batch-size", 1, "[Experimental] The maximum number of series to be batched in a single gRPC response message from Store Gateways. A value of 0 or 1 disables batching.") f.IntVar(&cfg.IngesterQueryMaxAttempts, "querier.ingester-query-max-attempts", 1, "The maximum number of times we attempt fetching data from ingesters for retryable errors (ex. partial data returned).") f.DurationVar(&cfg.LookbackDelta, "querier.lookback-delta", 5*time.Minute, "Time since the last sample after which a time series is considered stale and ignored by expression evaluations.") - f.DurationVar(&cfg.ShuffleShardingIngestersLookbackPeriod, "querier.shuffle-sharding-ingesters-lookback-period", 0, "When distributor's sharding strategy is shuffle-sharding and this setting is > 0, queriers fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. The lookback period should be greater or equal than the configured 'query store after' and 'query ingesters within'. If this setting is 0, queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).") f.Int64Var(&cfg.MaxSubQuerySteps, "querier.max-subquery-steps", 0, "Max number of steps allowed for every subquery expression in query. Number of steps is calculated using subquery range / step. A value > 0 enables it.") f.BoolVar(&cfg.IgnoreMaxQueryLength, "querier.ignore-max-query-length", false, "If enabled, ignore max query length check at Querier select method. Users can choose to ignore it since the validation can be done before Querier evaluation like at Query Frontend or Ruler.") f.BoolVar(&cfg.EnablePromQLExperimentalFunctions, "querier.enable-promql-experimental-functions", false, "[Experimental] If true, experimental promQL functions are enabled.") @@ -163,23 +155,11 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { // Validate the config func (cfg *Config) Validate() error { - // Ensure the config won't create a situation where no queriers are returned. - if cfg.QueryIngestersWithin != 0 && cfg.QueryStoreAfter != 0 { - if cfg.QueryStoreAfter >= cfg.QueryIngestersWithin { - return errBadLookbackConfigs - } - } if cfg.ResponseCompression != "" && cfg.ResponseCompression != "gzip" && cfg.ResponseCompression != "snappy" && cfg.ResponseCompression != "zstd" { return errUnsupportedResponseCompression } - if cfg.ShuffleShardingIngestersLookbackPeriod > 0 { - if cfg.ShuffleShardingIngestersLookbackPeriod < cfg.QueryStoreAfter { - return errShuffleShardingLookbackLessThanQueryStoreAfter - } - } - if cfg.StoreGatewayConsistencyCheckMaxAttempts < 1 { return errInvalidConsistencyCheckAttempts } @@ -221,13 +201,13 @@ func getChunksIteratorFunction(_ Config) chunkIteratorFunc { func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, reg prometheus.Registerer, logger log.Logger, isPartialDataEnabled partialdata.IsCfgEnabledFunc) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, engine.QueryEngine) { iteratorFunc := getChunksIteratorFunction(cfg) - distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, iteratorFunc, cfg.QueryIngestersWithin, isPartialDataEnabled, cfg.IngesterQueryMaxAttempts) + distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, iteratorFunc, isPartialDataEnabled, cfg.IngesterQueryMaxAttempts, limits) ns := make([]QueryableWithFilter, len(stores)) for ix, s := range stores { ns[ix] = storeQueryable{ QueryableWithFilter: s, - QueryStoreAfter: cfg.QueryStoreAfter, + limits: limits, } } queryable := NewQueryable(distributorQueryable, ns, cfg, limits) @@ -297,7 +277,7 @@ type QueryableWithFilter interface { // UseQueryable returns true if this queryable should be used to satisfy the query for given time range. // Query min and max time are in milliseconds since epoch. - UseQueryable(now time.Time, queryMinT, queryMaxT int64) bool + UseQueryable(now time.Time, userID string, queryMinT, queryMaxT int64) bool } type limiterHolder struct { @@ -363,12 +343,12 @@ func (q querier) setupFromCtx(ctx context.Context) (context.Context, *querier_st metadataQuerier := dqr queriers := make([]storage.Querier, 0) - if q.distributor.UseQueryable(q.now, mint, maxt) { + if q.distributor.UseQueryable(q.now, userID, mint, maxt) { queriers = append(queriers, dqr) } for _, s := range q.stores { - if !s.UseQueryable(q.now, mint, maxt) { + if !s.UseQueryable(q.now, userID, mint, maxt) { continue } @@ -452,7 +432,7 @@ func (q querier) Select(ctx context.Context, sortSeries bool, sp *storage.Select // Reset projection hints if querying ingesters or projection is not included. // Projection can only be applied when not querying mixed sources (ingester + store). if q.honorProjectionHints { - if !sp.ProjectionInclude || q.distributor.UseQueryable(q.now, mint, maxt) { + if !sp.ProjectionInclude || q.distributor.UseQueryable(q.now, userID, mint, maxt) { sp.ProjectionLabels = nil sp.ProjectionInclude = false } @@ -626,22 +606,27 @@ func (querier) Close() error { type storeQueryable struct { QueryableWithFilter - QueryStoreAfter time.Duration + limits *validation.Overrides } -func (s storeQueryable) UseQueryable(now time.Time, queryMinT, queryMaxT int64) bool { +func (s storeQueryable) UseQueryable(now time.Time, userID string, queryMinT, queryMaxT int64) bool { + var queryStoreAfter time.Duration + if s.limits != nil { + queryStoreAfter = s.limits.QueryStoreAfter(userID) + } + // Include this store only if mint is within QueryStoreAfter w.r.t current time. - if s.QueryStoreAfter != 0 && queryMinT > util.TimeToMillis(now.Add(-s.QueryStoreAfter)) { + if queryStoreAfter != 0 && queryMinT > util.TimeToMillis(now.Add(-queryStoreAfter)) { return false } - return s.QueryableWithFilter.UseQueryable(now, queryMinT, queryMaxT) + return s.QueryableWithFilter.UseQueryable(now, userID, queryMinT, queryMaxT) } type alwaysTrueFilterQueryable struct { storage.Queryable } -func (alwaysTrueFilterQueryable) UseQueryable(_ time.Time, _, _ int64) bool { +func (alwaysTrueFilterQueryable) UseQueryable(_ time.Time, _ string, _, _ int64) bool { return true } @@ -655,7 +640,7 @@ type useBeforeTimestampQueryable struct { ts int64 // Timestamp in milliseconds } -func (u useBeforeTimestampQueryable) UseQueryable(_ time.Time, queryMinT, _ int64) bool { +func (u useBeforeTimestampQueryable) UseQueryable(_ time.Time, _ string, queryMinT, _ int64) bool { if u.ts == 0 { return true } diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index ec08fee2ed2..0855fde08c7 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -295,7 +295,12 @@ func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) { } distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&unorderedResponse, nil) - distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, nil, 1) + + // Create limits with default QueryIngestersWithin + limits := DefaultLimitsConfig() + testOverrides := validation.NewOverrides(limits, nil) + + distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides) tCases := []struct { name string @@ -355,7 +360,7 @@ func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) { for _, queryable := range append(wQueriables, wDistributorQueriable) { var wQueryable = queryable.(*wrappedSampleAndChunkQueryable) - if wQueryable.UseQueryable(time.Now(), start.Unix()*1000, end.Unix()*1000) { + if wQueryable.UseQueryable(time.Now(), "0", start.Unix()*1000, end.Unix()*1000) { require.Equal(t, tc.sorted, wQueryable.queriers[0].selectCallsArgs[0][0]) } } @@ -440,7 +445,11 @@ func TestLimits(t *testing.T) { response: &streamResponse, } - distributorQueryableStreaming := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, nil, 1) + // Create limits with default QueryIngestersWithin + limits := DefaultLimitsConfig() + testOverrides := validation.NewOverrides(limits, nil) + + distributorQueryableStreaming := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides) tCases := []struct { name string @@ -658,7 +667,6 @@ func TestNoHistoricalQueryToIngester(t *testing.T) { for _, thanosEngine := range []bool{true, false} { for _, encoding := range encodings { for _, c := range testCases { - cfg.QueryIngestersWithin = c.queryIngestersWithin t.Run(fmt.Sprintf("thanosEngine=%t,encoding=%s,queryIngestersWithin=%v, test=%s", thanosEngine, encoding.String(), c.queryIngestersWithin, c.name), func(t *testing.T) { var queryEngine promql.QueryEngine if thanosEngine { @@ -673,7 +681,10 @@ func TestNoHistoricalQueryToIngester(t *testing.T) { chunkStore, _ := makeMockChunkStore(t, 24, encoding) distributor := &errDistributor{} - overrides := validation.NewOverrides(DefaultLimitsConfig(), nil) + // Create limits with QueryIngestersWithin from test case + limits := DefaultLimitsConfig() + limits.QueryIngestersWithin = model.Duration(c.queryIngestersWithin) + overrides := validation.NewOverrides(limits, nil) ctx := user.InjectOrgID(context.Background(), "0") queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))}, nil, log.NewNopLogger(), nil) @@ -1557,14 +1568,16 @@ func TestShortTermQueryToLTS(t *testing.T) { cfg.ActiveQueryTrackerDir = "" for _, c := range testCases { - cfg.QueryIngestersWithin = c.queryIngestersWithin - cfg.QueryStoreAfter = c.queryStoreAfter t.Run(c.name, func(t *testing.T) { //parallel testing causes data race chunkStore := &emptyChunkStore{} distributor := &errDistributor{} - overrides := validation.NewOverrides(DefaultLimitsConfig(), nil) + // Create limits with QueryIngestersWithin and QueryStoreAfter from test case + limits := DefaultLimitsConfig() + limits.QueryIngestersWithin = model.Duration(c.queryIngestersWithin) + limits.QueryStoreAfter = model.Duration(c.queryStoreAfter) + overrides := validation.NewOverrides(limits, nil) queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))}, nil, log.NewNopLogger(), nil) ctx := user.InjectOrgID(context.Background(), "0") @@ -1596,7 +1609,7 @@ func TestUseAlwaysQueryable(t *testing.T) { m := &mockQueryableWithFilter{} qwf := UseAlwaysQueryable(m) - require.True(t, qwf.UseQueryable(time.Now(), 0, 0)) + require.True(t, qwf.UseQueryable(time.Now(), "test", 0, 0)) require.False(t, m.useQueryableCalled) } @@ -1606,13 +1619,13 @@ func TestUseBeforeTimestamp(t *testing.T) { now := time.Now() qwf := UseBeforeTimestampQueryable(m, now.Add(-1*time.Hour)) - require.False(t, qwf.UseQueryable(now, util.TimeToMillis(now.Add(-5*time.Minute)), util.TimeToMillis(now))) + require.False(t, qwf.UseQueryable(now, "test", util.TimeToMillis(now.Add(-5*time.Minute)), util.TimeToMillis(now))) require.False(t, m.useQueryableCalled) - require.False(t, qwf.UseQueryable(now, util.TimeToMillis(now.Add(-1*time.Hour)), util.TimeToMillis(now))) + require.False(t, qwf.UseQueryable(now, "test", util.TimeToMillis(now.Add(-1*time.Hour)), util.TimeToMillis(now))) require.False(t, m.useQueryableCalled) - require.True(t, qwf.UseQueryable(now, util.TimeToMillis(now.Add(-1*time.Hour).Add(-time.Millisecond)), util.TimeToMillis(now))) + require.True(t, qwf.UseQueryable(now, "test", util.TimeToMillis(now.Add(-1*time.Hour).Add(-time.Millisecond)), util.TimeToMillis(now))) require.False(t, m.useQueryableCalled) // UseBeforeTimestampQueryable wraps Queryable, and not QueryableWithFilter. } @@ -1620,15 +1633,21 @@ func TestStoreQueryable(t *testing.T) { t.Parallel() m := &mockQueryableWithFilter{} now := time.Now() - sq := storeQueryable{m, time.Hour} - require.False(t, sq.UseQueryable(now, util.TimeToMillis(now.Add(-5*time.Minute)), util.TimeToMillis(now))) + // Create limits with QueryStoreAfter set to 1 hour + limits := DefaultLimitsConfig() + limits.QueryStoreAfter = model.Duration(time.Hour) + overrides := validation.NewOverrides(limits, nil) + + sq := storeQueryable{m, overrides} + + require.False(t, sq.UseQueryable(now, "test", util.TimeToMillis(now.Add(-5*time.Minute)), util.TimeToMillis(now))) require.False(t, m.useQueryableCalled) - require.False(t, sq.UseQueryable(now, util.TimeToMillis(now.Add(-1*time.Hour).Add(time.Millisecond)), util.TimeToMillis(now))) + require.False(t, sq.UseQueryable(now, "test", util.TimeToMillis(now.Add(-1*time.Hour).Add(time.Millisecond)), util.TimeToMillis(now))) require.False(t, m.useQueryableCalled) - require.True(t, sq.UseQueryable(now, util.TimeToMillis(now.Add(-1*time.Hour)), util.TimeToMillis(now))) + require.True(t, sq.UseQueryable(now, "test", util.TimeToMillis(now.Add(-1*time.Hour)), util.TimeToMillis(now))) require.True(t, m.useQueryableCalled) // storeQueryable wraps QueryableWithFilter, so it must call its UseQueryable method. } @@ -1641,24 +1660,6 @@ func TestConfig_Validate(t *testing.T) { "should pass with default config": { setup: func(cfg *Config) {}, }, - "should pass if 'query store after' is enabled and shuffle-sharding is disabled": { - setup: func(cfg *Config) { - cfg.QueryStoreAfter = time.Hour - }, - }, - "should pass if 'query store after' is enabled and shuffle-sharding is enabled with greater value": { - setup: func(cfg *Config) { - cfg.QueryStoreAfter = time.Hour - cfg.ShuffleShardingIngestersLookbackPeriod = 2 * time.Hour - }, - }, - "should fail if 'query store after' is enabled and shuffle-sharding is enabled with lesser value": { - setup: func(cfg *Config) { - cfg.QueryStoreAfter = time.Hour - cfg.ShuffleShardingIngestersLookbackPeriod = time.Minute - }, - expected: errShuffleShardingLookbackLessThanQueryStoreAfter, - }, "should fail if invalid parquet queryable default block store": { setup: func(cfg *Config) { cfg.EnableParquetQueryable = true @@ -1731,7 +1732,7 @@ func (m *mockQueryableWithFilter) Querier(_, _ int64) (storage.Querier, error) { return nil, nil } -func (m *mockQueryableWithFilter) UseQueryable(_ time.Time, _, _ int64) bool { +func (m *mockQueryableWithFilter) UseQueryable(_ time.Time, _ string, _, _ int64) bool { m.useQueryableCalled = true return true } @@ -1812,15 +1813,19 @@ func TestQuerier_ProjectionHints(t *testing.T) { distributor := &MockDistributor{} distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil) + // Create limits with default QueryIngestersWithin + limits := DefaultLimitsConfig() + testOverrides := validation.NewOverrides(limits, nil) + // Create distributor queryable that can be controlled to be used or not var distributorQueryable QueryableWithFilter if testData.queryIngesters { // Ingesters will be queried - distributorQueryable = newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, nil, 1) + distributorQueryable = newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides) } else { // Ingesters will not be queried (time range is too old) distributorQueryable = UseBeforeTimestampQueryable( - newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, nil, 1), + newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides), start.Add(-1*time.Hour), ) } @@ -1848,7 +1853,7 @@ func TestQuerier_ProjectionHints(t *testing.T) { var receivedHints *storage.SelectHints for _, queryable := range append([]QueryableWithFilter{storeQueryable}, wDistributorQueryable) { wQueryable := queryable.(*wrappedSampleAndChunkQueryable) - if wQueryable.UseQueryable(time.Now(), util.TimeToMillis(start), util.TimeToMillis(end)) { + if wQueryable.UseQueryable(time.Now(), "0", util.TimeToMillis(start), util.TimeToMillis(end)) { require.Len(t, wQueryable.queriers, 1) require.Len(t, wQueryable.queriers[0].selectCallsArgs, 1) receivedHints = wQueryable.queriers[0].selectCallsArgs[0][1].(*storage.SelectHints) diff --git a/pkg/util/validation/exporter_test.go b/pkg/util/validation/exporter_test.go index 01f96b92750..e5b366a999e 100644 --- a/pkg/util/validation/exporter_test.go +++ b/pkg/util/validation/exporter_test.go @@ -102,7 +102,9 @@ func TestOverridesExporter_withConfig(t *testing.T) { cortex_overrides{limit_name="parquet_max_fetched_chunk_bytes",user="tenant-a"} 0 cortex_overrides{limit_name="parquet_max_fetched_data_bytes",user="tenant-a"} 0 cortex_overrides{limit_name="parquet_max_fetched_row_count",user="tenant-a"} 0 + cortex_overrides{limit_name="query_ingesters_within",user="tenant-a"} 0 cortex_overrides{limit_name="query_partial_data",user="tenant-a"} 0 + cortex_overrides{limit_name="query_store_after",user="tenant-a"} 0 cortex_overrides{limit_name="query_vertical_shard_size",user="tenant-a"} 0 cortex_overrides{limit_name="reject_old_samples",user="tenant-a"} 0 cortex_overrides{limit_name="reject_old_samples_max_age",user="tenant-a"} 1.2096e+06 @@ -112,6 +114,7 @@ func TestOverridesExporter_withConfig(t *testing.T) { cortex_overrides{limit_name="ruler_query_offset",user="tenant-a"} 0 cortex_overrides{limit_name="ruler_tenant_shard_size",user="tenant-a"} 0 cortex_overrides{limit_name="rules_partial_data",user="tenant-a"} 0 + cortex_overrides{limit_name="shuffle_sharding_ingesters_lookback_period",user="tenant-a"} 0 cortex_overrides{limit_name="store_gateway_tenant_shard_size",user="tenant-a"} 0 `), "cortex_overrides")) } diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 2f14b5cab8c..3d852c1dfc5 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -194,6 +194,12 @@ type Limits struct { MaxQueriersPerTenant float64 `yaml:"max_queriers_per_tenant" json:"max_queriers_per_tenant"` QueryVerticalShardSize int `yaml:"query_vertical_shard_size" json:"query_vertical_shard_size"` QueryPartialData bool `yaml:"query_partial_data" json:"query_partial_data" doc:"nocli|description=Enable to allow queries to be evaluated with data from a single zone, if other zones are not available.|default=false"` + QueryIngestersWithin model.Duration `yaml:"query_ingesters_within" json:"query_ingesters_within"` + + // If set, the querier manipulates the max time to not be greater than + // "now - queryStoreAfter" so that most recent blocks are not queried. + QueryStoreAfter model.Duration `yaml:"query_store_after" json:"query_store_after"` + ShuffleShardingIngestersLookbackPeriod model.Duration `yaml:"shuffle_sharding_ingesters_lookback_period" json:"shuffle_sharding_ingesters_lookback_period"` // Parquet Queryable enforced limits. ParquetMaxFetchedRowCount int `yaml:"parquet_max_fetched_row_count" json:"parquet_max_fetched_row_count"` @@ -310,6 +316,16 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxFetchedSeriesPerQuery, "querier.max-fetched-series-per-query", 0, "The maximum number of unique series for which a query can fetch samples from each ingesters and blocks storage. This limit is enforced in the querier, ruler and store-gateway. 0 to disable") f.IntVar(&l.MaxFetchedChunkBytesPerQuery, "querier.max-fetched-chunk-bytes-per-query", 0, "Deprecated (use max-fetched-data-bytes-per-query instead): The maximum size of all chunks in bytes that a query can fetch from each ingester and storage. This limit is enforced in the querier, ruler and store-gateway. 0 to disable.") f.IntVar(&l.MaxFetchedDataBytesPerQuery, "querier.max-fetched-data-bytes-per-query", 0, "The maximum combined size of all data that a query can fetch from each ingester and storage. This limit is enforced in the querier and ruler for `query`, `query_range` and `series` APIs. 0 to disable.") + + _ = l.QueryIngestersWithin.Set("0") + f.Var(&l.QueryIngestersWithin, "limits.query-ingesters-within", "Maximum lookback duration for querying data from ingesters. Queries for data older than this will only query the long-term storage. This is a per-tenant limit that can be overridden in the runtime configuration. Should be less than or equal to close-idle-tsdb-timeout.") + + _ = l.QueryStoreAfter.Set("0") + f.Var(&l.QueryStoreAfter, "limits.query-store-after", "Minimum age of data before querying the long-term storage. Queries for data younger than this will only query ingesters. This is a per-tenant limit that can be overridden in the runtime configuration.") + + _ = l.ShuffleShardingIngestersLookbackPeriod.Set("0") + f.Var(&l.ShuffleShardingIngestersLookbackPeriod, "limits.shuffle-sharding-ingesters-lookback-period", "Lookback period for shuffle sharding of ingesters. This is a per-tenant limit that can be overridden in the runtime configuration. Should be greater than or equal to query-ingesters-within.") + f.Var(&l.MaxQueryLength, "store.max-query-length", "Limit the query time range (end - start time of range query parameter and max - min of data fetched time range). This limit is enforced in the query-frontend and ruler (on the received query). 0 to disable.") f.Var(&l.MaxQueryLookback, "querier.max-query-lookback", "Limit how long back data (series and metadata) can be queried, up until duration ago. This limit is enforced in the query-frontend, querier and ruler. If the requested time range is outside the allowed range, the request will not fail but will be manipulated to only query data within the allowed time range. 0 to disable.") f.IntVar(&l.MaxQueryParallelism, "querier.max-query-parallelism", 14, "Maximum number of split queries will be scheduled in parallel by the frontend.") @@ -414,6 +430,28 @@ func (l *Limits) Validate(nameValidationScheme model.ValidationScheme, shardByAl return nil } +func (l *Limits) ValidateQueryLimits(userID string, closeIdleTSDBTimeout time.Duration) error { + queryIngestersWithin := time.Duration(l.QueryIngestersWithin) + queryStoreAfter := time.Duration(l.QueryStoreAfter) + shuffleShardingLookback := time.Duration(l.ShuffleShardingIngestersLookbackPeriod) + + if queryIngestersWithin > 0 && closeIdleTSDBTimeout > 0 && queryIngestersWithin >= closeIdleTSDBTimeout { + return fmt.Errorf("tenant %s: query_ingesters_within (%s) must be less than close_idle_tsdb_timeout (%s)", + userID, queryIngestersWithin, closeIdleTSDBTimeout) + } + + if queryIngestersWithin > 0 && queryStoreAfter > 0 && queryStoreAfter >= queryIngestersWithin { + return fmt.Errorf("tenant %s: query_store_after (%s) must be less than query_ingesters_within (%s)", + userID, queryStoreAfter, queryIngestersWithin) + } + + if queryStoreAfter > 0 && shuffleShardingLookback > 0 && shuffleShardingLookback < queryStoreAfter { + return fmt.Errorf("tenant %s: shuffle_sharding_ingesters_lookback_period (%s) is less than query_store_after (%s)", + userID, shuffleShardingLookback, queryStoreAfter) + } + + return nil +} // UnmarshalYAML implements the yaml.Unmarshaler interface. func (l *Limits) UnmarshalYAML(unmarshal func(any) error) error { @@ -1162,6 +1200,18 @@ func (o *Overrides) MaxTotalLabelValueLengthForUnoptimizedRegex(userID string) i return o.GetOverridesForUser(userID).MaxTotalLabelValueLengthForUnoptimizedRegex } +func (o *Overrides) QueryIngestersWithin(userID string) time.Duration { + return time.Duration(o.GetOverridesForUser(userID).QueryIngestersWithin) +} + +func (o *Overrides) QueryStoreAfter(userID string) time.Duration { + return time.Duration(o.GetOverridesForUser(userID).QueryStoreAfter) +} + +func (o *Overrides) ShuffleShardingIngestersLookbackPeriod(userID string) time.Duration { + return time.Duration(o.GetOverridesForUser(userID).ShuffleShardingIngestersLookbackPeriod) +} + // GetOverridesForUser returns the per-tenant limits with overrides. func (o *Overrides) GetOverridesForUser(userID string) *Limits { if o.tenantLimits != nil { diff --git a/pkg/util/validation/limits_test.go b/pkg/util/validation/limits_test.go index 807b3d8e2b4..a0b0b90e3c1 100644 --- a/pkg/util/validation/limits_test.go +++ b/pkg/util/validation/limits_test.go @@ -996,3 +996,274 @@ func TestIsLimitError(t *testing.T) { assert.False(t, IsLimitError(fmt.Errorf("test error"))) assert.True(t, IsLimitError(LimitError("test error"))) } +func TestLimits_ValidateQueryLimits(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + queryIngestersWithin time.Duration + queryStoreAfter time.Duration + shuffleShardingIngestersLookbackPeriod time.Duration + closeIdleTSDBTimeout time.Duration + expectedError string + }{ + "all limits disabled (zero values) should be valid": { + queryIngestersWithin: 0, + queryStoreAfter: 0, + shuffleShardingIngestersLookbackPeriod: 0, + closeIdleTSDBTimeout: 0, + expectedError: "", + }, + "valid configuration with all limits enabled": { + queryIngestersWithin: 25 * time.Hour, + queryStoreAfter: 24 * time.Hour, + shuffleShardingIngestersLookbackPeriod: 25 * time.Hour, + closeIdleTSDBTimeout: 26 * time.Hour, + expectedError: "", + }, + "valid configuration with overlap for safety": { + queryIngestersWithin: 25 * time.Hour, + queryStoreAfter: 23 * time.Hour, + shuffleShardingIngestersLookbackPeriod: 26 * time.Hour, + closeIdleTSDBTimeout: 30 * time.Hour, + expectedError: "", + }, + "valid configuration with only queryIngestersWithin enabled": { + queryIngestersWithin: 25 * time.Hour, + queryStoreAfter: 0, + shuffleShardingIngestersLookbackPeriod: 0, + closeIdleTSDBTimeout: 26 * time.Hour, + expectedError: "", + }, + "valid configuration with only queryStoreAfter enabled": { + queryIngestersWithin: 0, + queryStoreAfter: 24 * time.Hour, + shuffleShardingIngestersLookbackPeriod: 25 * time.Hour, + closeIdleTSDBTimeout: 0, + expectedError: "", + }, + "invalid: queryIngestersWithin >= closeIdleTSDBTimeout": { + queryIngestersWithin: 25 * time.Hour, + queryStoreAfter: 24 * time.Hour, + shuffleShardingIngestersLookbackPeriod: 25 * time.Hour, + closeIdleTSDBTimeout: 25 * time.Hour, + expectedError: "query_ingesters_within (25h0m0s) must be less than close_idle_tsdb_timeout (25h0m0s)", + }, + "invalid: queryIngestersWithin > closeIdleTSDBTimeout": { + queryIngestersWithin: 26 * time.Hour, + queryStoreAfter: 24 * time.Hour, + shuffleShardingIngestersLookbackPeriod: 26 * time.Hour, + closeIdleTSDBTimeout: 25 * time.Hour, + expectedError: "query_ingesters_within (26h0m0s) must be less than close_idle_tsdb_timeout (25h0m0s)", + }, + "invalid: queryStoreAfter >= queryIngestersWithin": { + queryIngestersWithin: 24 * time.Hour, + queryStoreAfter: 24 * time.Hour, + shuffleShardingIngestersLookbackPeriod: 25 * time.Hour, + closeIdleTSDBTimeout: 26 * time.Hour, + expectedError: "query_store_after (24h0m0s) must be less than query_ingesters_within (24h0m0s)", + }, + "invalid: queryStoreAfter > queryIngestersWithin": { + queryIngestersWithin: 24 * time.Hour, + queryStoreAfter: 25 * time.Hour, + shuffleShardingIngestersLookbackPeriod: 26 * time.Hour, + closeIdleTSDBTimeout: 27 * time.Hour, + expectedError: "query_store_after (25h0m0s) must be less than query_ingesters_within (24h0m0s)", + }, + "invalid: shuffleShardingLookback < queryStoreAfter": { + queryIngestersWithin: 25 * time.Hour, + queryStoreAfter: 24 * time.Hour, + shuffleShardingIngestersLookbackPeriod: 20 * time.Hour, + closeIdleTSDBTimeout: 26 * time.Hour, + expectedError: "shuffle_sharding_ingesters_lookback_period (20h0m0s) is less than query_store_after (24h0m0s)", + }, + "valid: shuffleShardingLookback between queryStoreAfter and queryIngestersWithin": { + queryIngestersWithin: 25 * time.Hour, + queryStoreAfter: 20 * time.Hour, + shuffleShardingIngestersLookbackPeriod: 22 * time.Hour, + closeIdleTSDBTimeout: 26 * time.Hour, + expectedError: "", + }, + "boundary: queryIngestersWithin exactly 1ms less than closeIdleTSDBTimeout": { + queryIngestersWithin: 25*time.Hour - time.Millisecond, + queryStoreAfter: 24 * time.Hour, + shuffleShardingIngestersLookbackPeriod: 25 * time.Hour, + closeIdleTSDBTimeout: 25 * time.Hour, + expectedError: "", + }, + "boundary: queryStoreAfter exactly 1ms less than queryIngestersWithin": { + queryIngestersWithin: 25 * time.Hour, + queryStoreAfter: 25*time.Hour - time.Millisecond, + shuffleShardingIngestersLookbackPeriod: 25 * time.Hour, + closeIdleTSDBTimeout: 26 * time.Hour, + expectedError: "", + }, + "boundary: shuffleShardingLookback exactly equal to queryStoreAfter": { + queryIngestersWithin: 25 * time.Hour, + queryStoreAfter: 24 * time.Hour, + shuffleShardingIngestersLookbackPeriod: 24 * time.Hour, + closeIdleTSDBTimeout: 26 * time.Hour, + expectedError: "", + }, + "edge case: very large values": { + queryIngestersWithin: 365 * 24 * time.Hour, // 1 year + queryStoreAfter: 364 * 24 * time.Hour, + shuffleShardingIngestersLookbackPeriod: 365 * 24 * time.Hour, + closeIdleTSDBTimeout: 366 * 24 * time.Hour, + expectedError: "", + }, + "edge case: very small values": { + queryIngestersWithin: 2 * time.Second, + queryStoreAfter: 1 * time.Second, + shuffleShardingIngestersLookbackPeriod: 2 * time.Second, + closeIdleTSDBTimeout: 3 * time.Second, + expectedError: "", + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + limits := Limits{ + QueryIngestersWithin: model.Duration(testData.queryIngestersWithin), + QueryStoreAfter: model.Duration(testData.queryStoreAfter), + ShuffleShardingIngestersLookbackPeriod: model.Duration(testData.shuffleShardingIngestersLookbackPeriod), + } + + err := limits.ValidateQueryLimits("test-tenant", testData.closeIdleTSDBTimeout) + + if testData.expectedError == "" { + assert.NoError(t, err, "expected no error but got: %v", err) + } else { + assert.Error(t, err, "expected error but got none") + if err != nil { + assert.Contains(t, err.Error(), testData.expectedError, "error message mismatch") + } + } + }) + } +} + +func TestQueryLimits_TenantOverrides(t *testing.T) { + t.Parallel() + + // Setup: Create three tenants with different query limit configurations + tenantLimits := map[string]*Limits{ + "tenant-a": { + QueryIngestersWithin: model.Duration(1 * time.Hour), + QueryStoreAfter: model.Duration(30 * time.Minute), + ShuffleShardingIngestersLookbackPeriod: model.Duration(1 * time.Hour), + }, + "tenant-b": { + QueryIngestersWithin: model.Duration(2 * time.Hour), + QueryStoreAfter: model.Duration(1 * time.Hour), + ShuffleShardingIngestersLookbackPeriod: model.Duration(2 * time.Hour), + }, + "tenant-c": { + // Uses defaults (all zeros - disabled) + QueryIngestersWithin: 0, + QueryStoreAfter: 0, + ShuffleShardingIngestersLookbackPeriod: 0, + }, + } + + defaults := Limits{ + QueryIngestersWithin: model.Duration(25 * time.Hour), + QueryStoreAfter: model.Duration(24 * time.Hour), + ShuffleShardingIngestersLookbackPeriod: model.Duration(25 * time.Hour), + } + + ov := NewOverrides(defaults, newMockTenantLimits(tenantLimits)) + + // Verify tenant-a gets their specific limits + assert.Equal(t, 1*time.Hour, ov.QueryIngestersWithin("tenant-a")) + assert.Equal(t, 30*time.Minute, ov.QueryStoreAfter("tenant-a")) + assert.Equal(t, 1*time.Hour, ov.ShuffleShardingIngestersLookbackPeriod("tenant-a")) + + // Verify tenant-b gets their specific limits + assert.Equal(t, 2*time.Hour, ov.QueryIngestersWithin("tenant-b")) + assert.Equal(t, 1*time.Hour, ov.QueryStoreAfter("tenant-b")) + assert.Equal(t, 2*time.Hour, ov.ShuffleShardingIngestersLookbackPeriod("tenant-b")) + + // Verify tenant-c gets their specific limits (zeros) + assert.Equal(t, time.Duration(0), ov.QueryIngestersWithin("tenant-c")) + assert.Equal(t, time.Duration(0), ov.QueryStoreAfter("tenant-c")) + assert.Equal(t, time.Duration(0), ov.ShuffleShardingIngestersLookbackPeriod("tenant-c")) + + // Verify unknown tenant gets defaults + assert.Equal(t, 25*time.Hour, ov.QueryIngestersWithin("tenant-unknown")) + assert.Equal(t, 24*time.Hour, ov.QueryStoreAfter("tenant-unknown")) + assert.Equal(t, 25*time.Hour, ov.ShuffleShardingIngestersLookbackPeriod("tenant-unknown")) +} + +func TestQueryLimits_TenantOverridesValidation(t *testing.T) { + t.Parallel() + + closeIdleTSDBTimeout := 26 * time.Hour + + tests := map[string]struct { + tenantLimits map[string]*Limits + tenantID string + expectedError string + }{ + "valid tenant configuration": { + tenantLimits: map[string]*Limits{ + "valid-tenant": { + QueryIngestersWithin: model.Duration(25 * time.Hour), + QueryStoreAfter: model.Duration(24 * time.Hour), + ShuffleShardingIngestersLookbackPeriod: model.Duration(25 * time.Hour), + }, + }, + tenantID: "valid-tenant", + expectedError: "", + }, + "invalid tenant: queryStoreAfter >= queryIngestersWithin": { + tenantLimits: map[string]*Limits{ + "invalid-tenant": { + QueryIngestersWithin: model.Duration(24 * time.Hour), + QueryStoreAfter: model.Duration(25 * time.Hour), + ShuffleShardingIngestersLookbackPeriod: model.Duration(26 * time.Hour), + }, + }, + tenantID: "invalid-tenant", + expectedError: "query_store_after (25h0m0s) must be less than query_ingesters_within (24h0m0s)", + }, + "invalid tenant: queryIngestersWithin >= closeIdleTSDBTimeout": { + tenantLimits: map[string]*Limits{ + "invalid-tenant": { + QueryIngestersWithin: model.Duration(26 * time.Hour), + QueryStoreAfter: model.Duration(24 * time.Hour), + ShuffleShardingIngestersLookbackPeriod: model.Duration(26 * time.Hour), + }, + }, + tenantID: "invalid-tenant", + expectedError: "query_ingesters_within (26h0m0s) must be less than close_idle_tsdb_timeout (26h0m0s)", + }, + "invalid tenant: shuffleShardingLookback < queryStoreAfter": { + tenantLimits: map[string]*Limits{ + "invalid-tenant": { + QueryIngestersWithin: model.Duration(25 * time.Hour), + QueryStoreAfter: model.Duration(24 * time.Hour), + ShuffleShardingIngestersLookbackPeriod: model.Duration(20 * time.Hour), + }, + }, + tenantID: "invalid-tenant", + expectedError: "shuffle_sharding_ingesters_lookback_period (20h0m0s) is less than query_store_after (24h0m0s)", + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + limits := testData.tenantLimits[testData.tenantID] + err := limits.ValidateQueryLimits(testData.tenantID, closeIdleTSDBTimeout) + + if testData.expectedError == "" { + assert.NoError(t, err, "expected no error but got: %v", err) + } else { + assert.Error(t, err, "expected error but got none") + if err != nil { + assert.Contains(t, err.Error(), testData.expectedError, "error message mismatch") + assert.Contains(t, err.Error(), testData.tenantID, "error should contain tenant ID") + } + } + }) + } +}