Skip to content

Conversation

rubywtl
Copy link
Contributor

@rubywtl rubywtl commented Aug 13, 2025

What this PR does:
This PR introduces a Fragmenter interface that splits logical query plans into fragments when distributed execution is enabled. The Fragmenter appends metadata to each fragment for tracking, which the scheduler then uses to route fragments to appropriate queriers. The scheduler maintains a mapping between fragments and querier addresses to track fragment locations across the distributed system.

Which issue(s) this PR fixes:
related to proposal #6789

Checklist

  • Tests updated
  • Documentation added
  • CHANGELOG.md updated - the order of entries should be [CHANGE], [FEATURE], [ENHANCEMENT], [BUGFIX]

t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent
t.Cfg.Worker.TargetHeaders = t.Cfg.API.HTTPRequestHeadersToLog
return querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(internalQuerierRouter), util_log.Logger, prometheus.DefaultRegisterer)
ipAddr, err := ring.GetInstanceAddr(t.Cfg.Alertmanager.ShardingRing.InstanceAddr, t.Cfg.Alertmanager.ShardingRing.InstanceInterfaceNames, util_log.Logger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why using alertmanager config here

Copy link
Contributor Author

@rubywtl rubywtl Aug 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The gPRC params I needed are under RingConfig struct, which is called ShardedRing here, but it doesn't exist under querier

[update] I will add new field (ring configs) for querier 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Umm I don't think we want to add a Ring for querier. We just need the configurations for the addresses and interface, etc

defer f.mu.Unlock()

keysToDelete := make([]distributed_execution.FragmentKey, 0)
for key := range f.mappings {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the methods you have, is it easier to change mappings from mappings map[distributed_execution.FragmentKey]string to map[uint64]map[uint64]string?

You can find the map by just a lookup

Copy link
Contributor Author

@rubywtl rubywtl Aug 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but I made the FragmentKey struct so that it is easier to maintain (for example: if we ever want to change the types for the IDs or add more fields, we dont have to go through the codebase to fix it), and the code will be easier to understand (more literal). This fragment key type is also reused for remote nodes and child-root execution accesses to result cache in future PRs.


import "github.com/thanos-io/promql-engine/logicalplan"

type Fragmenter interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to move the Fragmenter to distributed_execution as fragmentation is specific to remote distribution.

The fragment table can be just moved to scheduler folder

@rubywtl rubywtl force-pushed the scheduler/logicalplan_fragment_coordination branch 4 times, most recently from 89e8021 to 67eff93 Compare August 21, 2025 21:39
}
}

func (f *FragmentTable) AddMapping(queryID uint64, fragmentID uint64, addr string) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we find a better name other than mapping?

return "", false
}

func (f *FragmentTable) ClearMappings(queryID uint64) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


// queryKey <--> fragment-ids lookup table allows faster cancellation of the whole query
// compared to traversing through the pending requests to find matching fragments
queryToFragmentsLookUp map[queryKey][]uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's find a better name for this. LookUp is not a correct name


type requestKey struct {
// additional layer to improve efficiency of deleting fragments of logical query plans
// while maintaining previous logics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is confusing. Which previous logic this struct maintains?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous logic is to cancel a query by its queryID and frontend address, but now there are multiple fragments under one queryID, and traversing through the pending request queue and checking the queryID is in-efficient, so I added an extra layer of mapping to keep track of the fragment IDs under the same queryKey.

for _, childID := range req.fragment.ChildIDs {
addr, ok := s.fragmentTable.GetChildAddr(req.queryID, childID)
if !ok {
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to return some error here if missing child addr?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused as I don't see any s.fragmentTable.AddAddressByID before this check. Could you clarify when you expect us to add items to the table before we get here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AddAddressByID is called in forwardRequestToQuerier, after fragments are picked up by queriers.

This ensures that when a querier successfully picks up a fragment, its address and fragment ID are recorded in the address table. Since we process fragments in child-to-parent order, parent fragments (which are scheduled later) can reliably look up their child fragment addresses using the fragment IDs when they need to fetch results. This ordering guarantees that child fragment addresses are already available when parent fragments need them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah nice, child-to-parent order was what I was missing

@rubywtl rubywtl force-pushed the scheduler/logicalplan_fragment_coordination branch 12 times, most recently from beeb9a5 to c81acda Compare September 4, 2025 19:06

// Enables or disables distributed query execution functionality
distributedExecEnabled bool
fragmenter plan_fragments.DummyFragmenter // Splits logical plans into executable fragments
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the type meant to be Fragmenter?

for _, childID := range req.fragment.ChildIDs {
addr, ok := s.fragmentTable.GetChildAddr(req.queryID, childID)
if !ok {
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused as I don't see any s.fragmentTable.AddAddressByID before this check. Could you clarify when you expect us to add items to the table before we get here?

Comment on lines 376 to 382
// if there is an error in any of the process enqueueing the fragments
// immediately propagate the error back
if err != nil {
return err
}

return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: simplify to return err?

@rubywtl rubywtl force-pushed the scheduler/logicalplan_fragment_coordination branch from c81acda to 06acad2 Compare September 5, 2025 16:53
Copy link
Contributor

@justinjung04 justinjung04 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great, thank you

@rubywtl rubywtl force-pushed the scheduler/logicalplan_fragment_coordination branch from 06acad2 to 9a5c0af Compare September 5, 2025 18:35
Copy link
Contributor

@yeya24 yeya24 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Great work

@yeya24 yeya24 merged commit eab2099 into cortexproject:master Sep 5, 2025
18 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants