React Router

调度系统入门

深入理解 React Scheduler 调度器的实现原理,包括优先级系统、时间切片和任务调度机制

调度系统入门

Scheduler 是 React 的调度器,负责管理任务的优先级和调度。它是 React 实现并发渲染的关键组件。本文将深入分析 Scheduler 的实现原理。

🎯 核心概念

什么是 Scheduler?

Scheduler 是 React 的调度器,它的主要职责包括:

  1. 任务调度:根据优先级调度任务
  2. 时间切片:将长时间任务分割成小块
  3. 优先级管理:管理不同任务的优先级
  4. 饥饿问题解决:防止低优先级任务饿死

优先级系统

React 定义了 5 个优先级等级:

// packages/scheduler/src/SchedulerPriorities.js

export const NoPriority = 0;
export const ImmediatePriority = 1;
export const UserBlockingPriority = 2;
export const NormalPriority = 3;
export const LowPriority = 4;
export const IdlePriority = 5;

🔍 源码实现分析

1. 优先级系统实现

// packages/scheduler/src/Scheduler.js

const maxSigned31BitInt = 1073741823;

// 优先级对应的过期时间
const IMMEDIATE_PRIORITY_TIMEOUT = -1;
const USER_BLOCKING_PRIORITY_TIMEOUT = 250;
const NORMAL_PRIORITY_TIMEOUT = 5000;
const LOW_PRIORITY_TIMEOUT = 10000;
const IDLE_PRIORITY_TIMEOUT = maxSigned31BitInt;

// 优先级到过期时间的映射
const timeoutTable = {
  [ImmediatePriority]: IMMEDIATE_PRIORITY_TIMEOUT,
  [UserBlockingPriority]: USER_BLOCKING_PRIORITY_TIMEOUT,
  [NormalPriority]: NORMAL_PRIORITY_TIMEOUT,
  [LowPriority]: LOW_PRIORITY_TIMEOUT,
  [IdlePriority]: IDLE_PRIORITY_TIMEOUT,
};

function timeoutForPriority(priorityLevel: PriorityLevel): number {
  return timeoutTable[priorityLevel];
}

2. 任务队列管理

// packages/scheduler/src/Scheduler.js

// 任务队列(小顶堆)
const taskQueue: Array<Task> = [];
const timerQueue: Array<Task> = [];

// 任务结构
type Task = {
  id: number,
  callback: Callback | null,
  priorityLevel: PriorityLevel,
  startTime: number,
  expirationTime: number,
  sortIndex: number,
};

// 添加任务到队列
function push(heap: Array<Task>, node: Task): void {
  const index = heap.length;
  heap.push(node);
  siftUp(heap, node, index);
}

// 从队列中取出任务
function pop(heap: Array<Task>): Task | null {
  if (heap.length === 0) {
    return null;
  }
  const first = heap[0];
  const last = heap.pop();
  
  if (last !== first) {
    heap[0] = last;
    siftDown(heap, last, 0);
  }
  
  return first;
}

// 查看队列顶部的任务
function peek(heap: Array<Task>): Task | null {
  return heap.length === 0 ? null : heap[0];
}

3. 时间切片实现

// packages/scheduler/src/Scheduler.js

// 时间切片配置
const FRAME_TIMEOUT = 5; // 5ms
const FRAME_INTERVAL = 16; // 16ms (60fps)

let isHostCallbackScheduled = false;
let isPerformingWork = false;

// 主工作循环
function workLoop(hasTimeRemaining: boolean, initialTime: number) {
  let currentTime = initialTime;
  advanceTimers(currentTime);
  currentTask = peek(taskQueue);
  
  while (currentTask !== null && !isHostCallbackScheduled) {
    if (currentTask.expirationTime > currentTime) {
      if (!hasTimeRemaining) {
        // 时间片用完,让出控制权
        break;
      }
    } else {
      const callback = currentTask.callback;
      if (typeof callback === 'function') {
        currentTask.callback = null;
        currentPriorityLevel = currentTask.priorityLevel;
        const didUserCallbackTimeout = currentTask.expirationTime <= currentTime;
        
        const continuationCallback = callback(didUserCallbackTimeout);
        currentTime = getCurrentTime();
        
        if (typeof continuationCallback === 'function') {
          currentTask.callback = continuationCallback;
        } else {
          if (currentTask === peek(taskQueue)) {
            pop(taskQueue);
          }
        }
        advanceTimers(currentTime);
      } else {
        pop(taskQueue);
      }
    }
    currentTask = peek(taskQueue);
  }
  
  if (currentTask !== null) {
    return true; // 还有任务需要处理
  } else {
    const firstTimer = peek(timerQueue);
    if (firstTimer !== null) {
      requestHostTimeout(handleTimeout, firstTimer.startTime - currentTime);
    }
    return false;
  }
}

4. 任务调度

// packages/scheduler/src/Scheduler.js

// 调度任务
function scheduleCallback(
  priorityLevel: PriorityLevel,
  callback: Callback,
  options?: {delay: number},
): Task {
  const currentTime = getCurrentTime();
  
  let startTime;
  if (typeof options === 'object' && options !== null) {
    const delay = options.delay;
    if (typeof delay === 'number' && delay > 0) {
      startTime = currentTime + delay;
    } else {
      startTime = currentTime;
    }
  } else {
    startTime = currentTime;
  }
  
  let expirationTime;
  switch (priorityLevel) {
    case ImmediatePriority:
      expirationTime = startTime + 1;
      break;
    case UserBlockingPriority:
      expirationTime = startTime + 250;
      break;
    case IdlePriority:
      expirationTime = startTime + 1073741823;
      break;
    case LowPriority:
      expirationTime = startTime + 10000;
      break;
    case NormalPriority:
    default:
      expirationTime = startTime + 5000;
  }
  
  const newTask: Task = {
    id: taskIdCounter++,
    callback,
    priorityLevel,
    startTime,
    expirationTime,
    sortIndex: expirationTime,
  };
  
  if (startTime > currentTime) {
    // 延迟任务
    newTask.sortIndex = startTime;
    push(timerQueue, newTask);
    if (peek(taskQueue) === null && newTask === peek(timerQueue)) {
      if (isHostTimeoutScheduled) {
        cancelHostTimeout();
      } else {
        isHostTimeoutScheduled = true;
      }
      requestHostTimeout(handleTimeout, startTime - currentTime);
    }
  } else {
    // 立即执行的任务
    newTask.sortIndex = expirationTime;
    push(taskQueue, newTask);
    if (!isHostCallbackScheduled && !isPerformingWork) {
      isHostCallbackScheduled = true;
      requestHostCallback(flushWork);
    }
  }
  
  return newTask;
}

5. 主机回调实现

// packages/scheduler/src/forks/SchedulerHostConfig.default.js

// 主机回调的实现
let scheduledHostCallback: ((boolean, number) => boolean) | null = null;
let isMessageLoopRunning = false;
let taskTimeoutID: number = -1;

// 请求主机回调
function requestHostCallback(callback: (boolean, number) => boolean) {
  scheduledHostCallback = callback;
  if (!isMessageLoopRunning) {
    isMessageLoopRunning = true;
    schedulePerformWorkUntilDeadline();
  }
}

// 取消主机回调
function cancelHostCallback() {
  scheduledHostCallback = null;
}

// 调度工作直到截止时间
function schedulePerformWorkUntilDeadline() {
  if (typeof localSetImmediate === 'function') {
    localSetImmediate(performWorkUntilDeadline);
  } else if (typeof MessageChannel !== 'undefined') {
    const channel = new MessageChannel();
    const port = channel.port2;
    channel.port1.onmessage = performWorkUntilDeadline;
    port.postMessage(null);
  } else {
    localSetTimeout(performWorkUntilDeadline, 0);
  }
}

// 执行工作直到截止时间
function performWorkUntilDeadline() {
  if (scheduledHostCallback !== null) {
    const currentTime = getCurrentTime();
    const hasTimeRemaining = true;
    
    let hasMoreWork = true;
    try {
      hasMoreWork = scheduledHostCallback(hasTimeRemaining, currentTime);
    } finally {
      if (hasMoreWork) {
        schedulePerformWorkUntilDeadline();
      } else {
        isMessageLoopRunning = false;
        scheduledHostCallback = null;
      }
    }
  } else {
    isMessageLoopRunning = false;
  }
}

🎯 实际应用示例

1. 自定义调度器

// 自定义调度器示例
class CustomScheduler {
  constructor() {
    this.taskQueue = [];
    this.isRunning = false;
  }
  
  scheduleTask(priority, callback) {
    const task = {
      id: Date.now(),
      priority,
      callback,
      startTime: Date.now(),
    };
    
    this.taskQueue.push(task);
    this.taskQueue.sort((a, b) => a.priority - b.priority);
    
    if (!this.isRunning) {
      this.run();
    }
  }
  
  run() {
    this.isRunning = true;
    
    const processTask = () => {
      if (this.taskQueue.length === 0) {
        this.isRunning = false;
        return;
      }
      
      const task = this.taskQueue.shift();
      task.callback();
      
      // 使用 requestIdleCallback 或 setTimeout 继续处理
      if (typeof requestIdleCallback !== 'undefined') {
        requestIdleCallback(processTask);
      } else {
        setTimeout(processTask, 0);
      }
    };
    
    processTask();
  }
}

// 使用示例
const scheduler = new CustomScheduler();

scheduler.scheduleTask(1, () => console.log('High priority task'));
scheduler.scheduleTask(3, () => console.log('Normal priority task'));
scheduler.scheduleTask(5, () => console.log('Low priority task'));

2. 优先级监控

// 监控任务优先级
function usePriorityMonitor() {
  const [highPriorityTasks, setHighPriorityTasks] = useState(0);
  const [normalPriorityTasks, setNormalPriorityTasks] = useState(0);
  const [lowPriorityTasks, setLowPriorityTasks] = useState(0);
  
  useEffect(() => {
    if (__DEV__) {
      const originalScheduleCallback = scheduleCallback;
      
      scheduleCallback = function(priorityLevel, callback, options) {
        // 统计不同优先级的任务数量
        switch (priorityLevel) {
          case ImmediatePriority:
          case UserBlockingPriority:
            setHighPriorityTasks(prev => prev + 1);
            break;
          case NormalPriority:
            setNormalPriorityTasks(prev => prev + 1);
            break;
          case LowPriority:
          case IdlePriority:
            setLowPriorityTasks(prev => prev + 1);
            break;
        }
        
        return originalScheduleCallback.call(this, priorityLevel, callback, options);
      };
      
      return () => {
        scheduleCallback = originalScheduleCallback;
      };
    }
  }, []);
  
  return { highPriorityTasks, normalPriorityTasks, lowPriorityTasks };
}

3. 性能优化

// 使用调度器优化性能
function OptimizedComponent({ data }) {
  const [processedData, setProcessedData] = useState([]);
  const [isProcessing, setIsProcessing] = useState(false);
  
  useEffect(() => {
    if (data.length > 1000) {
      setIsProcessing(true);
      
      // 使用低优先级处理大量数据
      const task = scheduleCallback(LowPriority, () => {
        const result = data.map(item => ({
          ...item,
          processed: expensiveProcessing(item)
        }));
        setProcessedData(result);
        setIsProcessing(false);
      });
      
      return () => {
        // 取消任务
        if (task) {
          cancelCallback(task);
        }
      };
    } else {
      // 小数据量直接处理
      setProcessedData(data.map(item => ({
        ...item,
        processed: expensiveProcessing(item)
      })));
    }
  }, [data]);
  
  return (
    <div>
      {isProcessing && <div>处理中...</div>}
      <DataList data={processedData} />
    </div>
  );
}

🚀 性能优化技巧

1. 合理使用优先级

// ✅ 正确:根据任务重要性设置优先级
function handleUserInput(event) {
  // 用户输入使用高优先级
  scheduleCallback(UserBlockingPriority, () => {
    updateUI(event.target.value);
  });
}

function handleBackgroundTask() {
  // 后台任务使用低优先级
  scheduleCallback(LowPriority, () => {
    processBackgroundData();
  });
}

2. 避免长时间阻塞

// ❌ 错误:长时间阻塞主线程
function badExample() {
  const result = [];
  for (let i = 0; i < 1000000; i++) {
    result.push(expensiveCalculation(i));
  }
  return result;
}

// ✅ 正确:使用时间切片
function goodExample() {
  return new Promise((resolve) => {
    const result = [];
    let index = 0;
    
    function processChunk() {
      const startTime = Date.now();
      
      while (index < 1000000 && Date.now() - startTime < 5) {
        result.push(expensiveCalculation(index));
        index++;
      }
      
      if (index < 1000000) {
        // 继续处理下一块
        scheduleCallback(NormalPriority, processChunk);
      } else {
        resolve(result);
      }
    }
    
    processChunk();
  });
}

3. 任务取消机制

// 实现任务取消
function useCancellableTask() {
  const [taskId, setTaskId] = useState(null);
  
  const startTask = useCallback((callback) => {
    const task = scheduleCallback(NormalPriority, callback);
    setTaskId(task.id);
    return task;
  }, []);
  
  const cancelTask = useCallback(() => {
    if (taskId) {
      cancelCallback({ id: taskId });
      setTaskId(null);
    }
  }, [taskId]);
  
  return { startTask, cancelTask };
}

📝 总结

Scheduler 通过以下机制实现高效的任务调度:

  1. 优先级系统:5 个优先级等级,不同优先级有不同的过期时间
  2. 时间切片:将长时间任务分割成小块,避免阻塞主线程
  3. 任务队列:使用小顶堆管理任务,按优先级排序
  4. 饥饿问题解决:通过过期时间机制防止低优先级任务饿死

理解 Scheduler 有助于我们:

  • 更好地理解 React 的并发渲染机制
  • 编写更高效的异步代码
  • 进行性能优化
  • 实现自定义的调度逻辑

下一步Hooks 深度解析 - 深入分析更多 Hooks 的实现细节