Skip to content

Conversation

Ethereal-O
Copy link
Contributor

Purpose of the PR

  • Design a new scheduler

Main Changes

  • Support priority/elder/depends based scheduling.

Verifying these changes

  • Trivial rework / code cleanup without any test coverage. (No Need)
  • Already covered by existing tests, such as (please modify tests here).
  • Need tests and can be verified as follows.

I have write some scripts in /test folder.

Does this PR potentially affect the following parts?

  • Nope
  • Dependencies (add/update license info)
  • Modify configurations
  • The public API
  • Other affects (typed here)

Documentation Status

  • Doc - TODO
  • Doc - Done
  • Doc - No Need

@dosubot dosubot bot added size:XXL This PR changes 1000+ lines, ignoring generated files. feature New feature labels Sep 26, 2025
@imbajin imbajin requested a review from Copilot September 26, 2025 10:09
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces a comprehensive scheduler system with priority, dependency, and cron-based scheduling capabilities. The new scheduler replaces the previous simple queue-based system with a sophisticated task management framework supporting advanced scheduling algorithms.

Key changes:

  • New scheduler architecture with modular components for algorithm, resource, task, and cron management
  • Priority-based scheduling with support for task dependencies (preorders)
  • Cron expression support for recurring tasks
  • Worker group management and resource allocation improvements

Reviewed Changes

Copilot reviewed 40 out of 41 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
vermeer/test/scheduler/*.go Test framework for scheduler functionality including priority, dependency, and cron testing
vermeer/apps/master/schedules/*.go Core scheduler implementation with algorithm, resource, task, and cron managers
vermeer/client/*.go Client API extensions for batch operations and task sequence tracking
vermeer/apps/structure/task.go Task structure enhancements for scheduler fields
vermeer/apps/master/bl/*.go Business logic updates to integrate with new scheduler
vermeer/config/*.ini Configuration additions for scheduler parameters

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@imbajin
Copy link
Member

imbajin commented Oct 11, 2025

Summary

This PR introduces a comprehensive scheduler redesign with priority-based scheduling, task dependencies, and cron support. The architecture is well-designed, but there are critical concurrency issues that must be addressed before merging.

Verdict: REQUEST CHANGES ⚠️


Architecture Changes

Major Refactoring ✅

The PR completely redesigns the scheduler from a simple queue-based system to a sophisticated multi-component architecture:

Old Architecture:

  • Simple SpaceQueue (FIFO)
  • Basic Broker for worker assignment
  • Single-threaded dispatch loop

New Architecture:

  • SchedulerResourceManager - Worker pool management
  • SchedulerTaskManager - Task lifecycle, dependencies, priorities
  • SchedulerAlgorithmManager - Scheduling algorithm logic
  • SchedulerCronManager - Recurring task support
  • Event-driven scheduling with ticker-based dispatch

New Features:

  1. ✅ Priority-based scheduling with integer priorities
  2. ✅ Task dependency management (preorders)
  3. ✅ Cron expressions for recurring tasks
  4. ✅ Exclusive/concurrent execution modes
  5. ✅ Configurable soft scheduling

Critical Issues 🚨

1. TOCTTOU Race Condition (scheduler_bl.go:421-423)

Severity: CRITICAL

// TODO: Is here need a lock? TOCTTOU
if taskInfo.State != structure.TaskStateWaiting {
    logrus.Errorf("task state is not in 'Waiting' state, taskID: %v", taskInfo)
    return
}

The TODO correctly identifies a Time-Of-Check-Time-Of-Use vulnerability. State check and operations are not atomic.

Fix:

defer taskInfo.Unlock(taskInfo.Lock())
if taskInfo.State != structure.TaskStateWaiting {
    // ...
}

2. Resource Leak: Channel Not Closed (scheduler_bl.go:62)

Severity: HIGH

The startChan is never closed, causing potential goroutine leaks during shutdown.

Fix: Add cleanup:

func (s *ScheduleBl) Shutdown() {
    close(s.startChan)
}

3. Dependency State Validation Missing (scheduler_bl.go:234-244)

Severity: HIGH

No check if dependency tasks are completed. Tasks could depend on failed/canceled tasks.

Fix:

if depTask.State != structure.TaskStateComplete {
    return false, fmt.Errorf("dependency task %d is not complete (state: %s)", depTaskID, depTask.State)
}

4. Potential Deadlock Pattern (scheduler_bl.go:97-99)

Severity: HIGH

Multiple functions acquire locks then call TryScheduleNextTasks, which conditionally locks. This pattern is deadlock-prone.

Fix: Use consistent locking strategy, consider sync.RWMutex for read-heavy operations.


High Priority Issues ⚠️

5. Nil Pointer Risk (task_bl.go:87-92)

If Scheduler.cronManager is nil (initialization order issue), this will panic:

if err := Scheduler.cronManager.CheckCronExpression(cronExpr); err != nil {

Add nil checks.

6. API Breaking Change (scheduler_bl.go:502)

CloseCurrent signature changed from (taskId int32) to (taskId int32, removeWorkerName ...string). Document this breaking change and verify all call sites are updated.

7. Commented Out Critical Code (scheduler_bl.go:229)

//defer s.Unlock(s.Lock())

Critical locking code commented without explanation - this is dangerous.

8. Incomplete Implementation (scheduler_bl.go:159)

// TODO: NEED TO JUDGE IF THE TASK CAN CONCURRENTLY RUNNING
// NOT only by user setting, but also by scheduler setting

Important concurrent execution logic deferred.


Medium Priority Issues

9. Missing Input Validation (task_bl.go:71-79)

No upper bound on priority values. Large priorities could cause unexpected behavior.

10. Silent Error Handling (scheduler_bl.go:441-443)

if err := s.taskManager.AddTaskStartSequence(taskInfo.ID); err != nil {
    logrus.Errorf("failed to add task '%d' to start sequence: %v", taskInfo.ID, err)
}

Errors logged but not handled.


CI/CD Status

Failed Checks ❌

  1. check-license-header - Missing Apache license headers in new files
  2. dependency-review - New dependencies need review

Passed Checks ✅

  • CodeQL (Go & Java)
  • Dependency license check

Test Coverage ⚠️

  • PR mentions tests in /test folder but not visible in diff
  • Need unit tests for:
    • Priority scheduling algorithm
    • Dependency resolution
    • Cron task scheduling
    • Concurrent execution

Recommendations

Must Fix Before Merge:

  1. Fix TOCTTOU race condition with atomic state checking
  2. Add resource cleanup (close channels on shutdown)
  3. Validate dependency task states
  4. Fix failing CI checks (license headers, dependency review)
  5. Review and fix all locking patterns
  6. Add nil pointer checks for managers

Should Fix:

  1. Complete TODO implementations (concurrent execution logic)
  2. Document API breaking changes
  3. Add comprehensive test coverage
  4. Add input validation (priority bounds)
  5. Remove commented code with explanation

Documentation Needed:

  1. Migration guide for scheduler API changes
  2. Configuration documentation (start_chan_size, ticker_interval, soft_schedule)
  3. Cron expression format specification

Positive Aspects 👍

  1. ✅ Well-architected component separation
  2. ✅ Good configuration externalization
  3. ✅ Comprehensive feature set
  4. ✅ Improved observability with detailed logging
  5. ✅ Added Docker support documentation

Risk Assessment

Risk Level: MEDIUM-HIGH due to concurrency issues in core scheduling logic

Estimated Time to Fix: 2-3 days for critical fixes, 1 week for comprehensive improvements

The architecture and features are excellent, but the critical concurrency issues must be resolved before this can be safely merged to production.

}
}()

// TODO: Is here need a lock? TOCTTOU
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition risk in task state checking

There's a TOCTTOU (Time-of-check to time-of-use) issue here. The comment indicates this concern but it should be addressed. Between checking the task state and starting the task, another goroutine could modify the state.

Suggested fix:

// Acquire lock before checking and starting
s.Lock()
defer s.Unlock(nil)

if taskInfo.State != structure.TaskStateWaiting {
    logrus.Errorf("task state is not in 'Waiting' state, taskID: %v", taskInfo)
    return
}

taskStarter, err := NewTaskStarter(taskInfo, agent)
if err != nil {
    logrus.Errorf("failed to create new TaskStarter err: %v", err)
    taskMgr.SetError(taskInfo, err.Error())
    return
}

taskInfo.StartTime = time.Now()
err = taskStarter.StartTask()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to need more time to do. It will need to change all APIs about task.setstate/taskmgr.setstate.

return nil
}
logrus.Debugf("all tasks: %d, workerGroups: %d/%d", len(allTasks), len(idleWorkerGroups), len(concurrentWorkerGroups))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented TODO needs resolution

This TODO indicates an important design decision that's unresolved. The scheduler needs to determine if tasks can run concurrently based on both user settings AND scheduler policies.

Action required:

  1. Either implement the logic to check concurrent execution capability
  2. Create a tracking issue for this work and reference it in the comment
  3. Document the temporary behavior and its limitations

Example:

// TODO(issue #XXX): Implement comprehensive concurrent execution check
// Currently only checks user settings. Need to add:
// 1. Resource availability check
// 2. Conflict detection with running tasks
// 3. Worker group capacity limits
// Temporary behavior: Tasks marked as non-exclusive may still be blocked
// if resources are insufficient

@Ethereal-O Ethereal-O force-pushed the formal_submit_apache branch from 4fc8b45 to eda7af8 Compare October 18, 2025 08:00
logrus.Debugf("no available tasks or workerGroups, allTasks: %d, workerGroups: %d/%d",
len(allTasks), len(idleWorkerGroups), len(concurrentWorkerGroups))
return nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

严重的逻辑错误: 使用 append 添加到预分配的 slice 会导致返回错误的结果。

当前问题:

errors := make([]error, len(taskInfos))  // 创建 len 个 nil 元素
oks := make([]bool, len(taskInfos))      // 创建 len 个 false 元素
// ... 循环中
errors = append(errors, err)  // 追加到末尾,导致 2*len 个元素
oks = append(oks, ok)

结果: 返回的 slice 长度是 2*len,前 len 个元素都是零值

修复方案 1 - 使用容量:

errors := make([]error, 0, len(taskInfos))
oks := make([]bool, 0, len(taskInfos))

修复方案 2 - 使用索引赋值:

for i, taskInfo := range taskInfos {
    ok, err := s.QueueTask(taskInfo)
    errors[i] = err
    oks[i] = ok
}

logrus.Debugf("all tasks: %d, workerGroups: %d/%d", len(allTasks), len(idleWorkerGroups), len(concurrentWorkerGroups))

// TODO: NEED TO JUDGE IF THE TASK CAN CONCURRENTLY RUNNING
// NOT only by user setting, but also by scheduler setting
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

错误处理不一致: 当 QueueTask 失败时,错误被记录但函数继续执行。这可能导致部分任务入队成功,部分失败,但调用者无法知道具体哪些失败了。

建议:

  1. 在遇到第一个错误时考虑是否应该停止并回滚已入队的任务
  2. 或者至少在返回值中明确标识哪些任务失败了
  3. 使用事务性的批量操作来保证原子性

示例改进:

// 记录成功和失败的任务
successTasks := make([]*structure.TaskInfo, 0, len(taskInfos))
for _, taskInfo := range taskInfos {
    ok, err := s.QueueTask(taskInfo)
    if err != nil {
        // 回滚已成功的任务
        for _, t := range successTasks {
            s.taskManager.RemoveTask(t.ID)
        }
        return false, fmt.Errorf("failed to queue task %d: %w", taskInfo.ID, err)
    }
    successTasks = append(successTasks, taskInfo)
}

return false, err
}

logrus.Debugf("queuing task %d with parameters: %+v", taskInfo.ID, taskInfo)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

验证不足: 依赖任务的验证只检查了任务是否存在,但没有检查:

  1. 依赖任务的状态(是否已完成、是否失败等)
  2. 是否存在循环依赖
  3. 依赖链的深度是否合理

建议:

for _, depTaskID := range taskInfo.Preorders {
    depTask := taskMgr.GetTaskByID(depTaskID)
    if depTask == nil {
        return false, fmt.Errorf("dependency task %d does not exist", depTaskID)
    }

    // 检查依赖任务状态
    if depTask.State == structure.TaskStateFailed {
        return false, fmt.Errorf("dependency task %d has failed", depTaskID)
    }

    // 检查循环依赖
    if hasCircularDependency(taskInfo.ID, depTaskID) {
        return false, errors.New("circular dependency detected")
    }

    // 检查 space 一致性
    if depTask.SpaceName != taskInfo.SpaceName {
        return false, errors.New("dependency task must be in same space")
    }
}

s.spaceQueue = (&schedules.SpaceQueue{}).Init()
s.broker = (&schedules.Broker{}).Init()
s.startChanSize = chanSizeInt

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

配置值硬编码: 这里使用字符串字面量 "3" 作为默认值,建议定义为常量。

建议:

const (
    defaultStartChanSize = "10"
    defaultMaxRunningTasks = "3"
    defaultCheckWaitingInterval = "5"
)

// 然后在使用时
maxRunningTasks := options.GetInt(s.Name+".max_running_tasks", defaultMaxRunningTasks)

这样可以:

  1. 提高代码可维护性
  2. 避免魔法数字
  3. 便于统一修改默认值

services.SetAdminRouters(sen, auth.TokenFilter, auth.AdminFilter)
services.SetUI(sen)
logrus.Info("token-auth was activated")
default:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

代码风格: 这里有一个空的 default 分支并且后面有多余的空行。应该删除或添加注释说明为什么需要空的 default 分支。

建议:

default:
    // No authentication required for other routes

或者如果不需要这个 default 分支,可以直接删除。

preorderList := strings.Split(preorders, ",")
for _, preorder := range preorderList {
if pid, err := strconv.ParseInt(preorder, 10, 32); err == nil {
if taskMgr.GetTaskByID(int32(pid)) == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

错误消息国际化: 错误消息使用了中文和英文混合。建议统一使用英文,或者实现完整的国际化支持。

示例:

// 当前
return nil, fmt.Errorf("preorder task id %d not exists", depTaskID)

// 建议改为统一英文
return nil, fmt.Errorf("preorder task id %d does not exist", depTaskID)

或者实现国际化框架:

return nil, i18n.Errorf("error.task.dependency_not_found", depTaskID)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature New feature size:XXL This PR changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants