时间分片调度器

时间分片调度器

本文是针对大数据表单元格批量请求优化方案一文中提及的“请求队列”的扩充实现

第一步:实现一个定时器

笔者考虑使用 setInterval 进行定时触发,但遇到一个问题,当页面渲染较多元素时,渲染线程占满了浏览器,导致脚本被延迟触发,定时间隔不稳定。因此,考虑在 worker 线程中执行 setInterval 定时器。

控制器接受两个操作,start和stop,对应开始计时和停止计时,开始后通过run事件回调给主线程。

const schedulerMap: Record<string, any> = {};

addEventListener('message', (e: MessageEvent) => {
  const { id, method, interval = 100 } = e.data;
  if (method === 'start') {
    if (!schedulerMap[id]) {
      schedulerMap[id] = setInterval(() => {
        postMessage({
          id,
          method: 'run',
        });
      }, interval);
    }
  } else if (method === 'stop') {
    clearInterval(schedulerMap[id]);
    delete schedulerMap[id];
  }
});

第二步:在主线程中监听worker事件

const worker = new BatchSchedulerWorker();

worker.addEventListener('message', (e) => {
  const { id, method } = e.data;

  if (method === 'run') {
    _batchGetDataCallback(id);
  }
});

第三步:实现一个数据收集模块

首先需要定义相关参数:

interface ISchedulerProps<T, K> {
  // 全局唯一命名
  schedulerName: string;
  // 批量处理的内容
  keys: K[];
  // 构建映射表
  buildMappingTable(params: K[]): Promise<Record<string, T>>;
  // 成功回调
  callback?: (params: Record<string, T>) => void;
  // 可选属性
  options?: {
    // 收集间隔
    interval?: number;
    // 是否开启缓存
    cache?: boolean;
    // 缓存过期时间
    cacheTTL?: number;
  };
}

// 模块定义
export const timeSlicedScheduler = <T, K = string>(
  params: ISchedulerProps<T, K>,
): Promise<Record<string, K>> => {}

这里需要同时实现两种调用方式:Promise调用和Callback调用,方便外部按需取用。

首先进行参数解析,赋予默认值:

const {
  schedulerName,
  keys,
  buildMappingTable: handler,
  callback = () => {
    //
  },
  options = {
    interval: 100,
    cache: false,
    cacheTTL: LRU_MAX_AGE,
    biz: 'hk4e',
  },
} = params;

const {
  interval = 100,
  cache = false,
  cacheTTL = LRU_MAX_AGE,
} = options;

const apiKey = `${schedulerName || handler.name || UNKNOWN_API_NAME}`;

然后将数据构造成内部需要的数据格式,并返回一个Promise,供外部使用。

const promise = new Promise<Record<string, T>>((resolved) => {
  const cachePrefix = `TIME_${apiKey}`;
  const apiCfg = apiConfigMap.get(apiKey);
  const uniqData = [...new Set(keys)];
  if (apiCfg) {
    apiCfg.data.push(...uniqData);
    apiCfg.callbacks.push({
      callback,
      promiseResolved: resolved,
      data: uniqData,
    });
    apiCfg.cache = cache;
    apiCfg.cacheTTL = cacheTTL;
  } else {
    apiConfigMap.set(apiKey, {
      data: [...uniqData],
      handler,
      callbacks: [{ callback, promiseResolved: resolved, data: uniqData }],
      cachePrefix,
      cache,
      cacheTTL,
    });
  }

  worker.postMessage({
    method: 'start',
    id: apiKey,
    interval: interval,
  });
});

return promise;

第四步:批量处理回调

  1. 首先读取是否还有存量数据,如果没有,停止调度触发器
  2. 如果用户开启缓存,执行以下操作:如果在缓存映射表找到对应键,则直接从缓存中读取数据,同时将该数据键移出请求队列
  3. 发起请求
  4. 收集请求数据,并通过外部传入的buildMappingTable处理成key-value形式(方便将各个形态不一的数据结构处理成能缓存的结构)
  5. 返回数据,同时触发用户传入的Callback和对应的Promise,完成
const _batchGetDataCallback = async (id: string) => {
  const apiCfg = apiConfigMap.get(id);
  if (!apiCfg) return;

  const { data, cache, cachePrefix, cacheTTL, handler } = apiCfg;

  const len = data.length;
  if (!len) {
    worker.postMessage({
      method: 'stop',
      id,
    });
    return;
  }

  const valueSet = new Set(data);
  const fns = apiCfg.callbacks.slice(0, len);
  apiCfg.data.splice(0, len);
  apiCfg.callbacks.splice(0, len);

  const cacheRes: Record<string, any> = {};
  if (cache) {
    [...valueSet].forEach((code) => {
      const cacheData = resultCache.get(`${cachePrefix}_${code}`);
      if (cacheData) {
        cacheRes[code] = cacheData;
        valueSet.delete(code);
      }
    });
  }

  let mergeRes: Record<string, any> = {};

  if (valueSet.size) {
    let originRes: Record<string, any>[] = [];
    try {
      const response = await Promise.allSettled(
        chunk([...valueSet], MAX_REQUEST_COUNT).map((codes) => {
          return handler(codes);
        }),
      );

      const _data: any[] = [];
      response.forEach((item) => {
        if (item.status === 'fulfilled') {
          const data = item.value;
          _data.push(data);
        }
      });
      originRes = _data || [];
    } catch (e) {
      originRes = [];
    }
    const fmtRes = originRes.reduce((curr, next) => {
      return merge(curr, next);
    }, {});
    mergeRes = { ...cacheRes, ...fmtRes };
  } else {
    mergeRes = cacheRes;
  }

  if (cache) {
    Object.keys(mergeRes).forEach((code) => {
      const data = mergeRes[code];
      if (!data) return;
      resultCache.set(`${cachePrefix}_${code}`, cloneDeep(data), {
        ttl: cacheTTL,
      });
    });
  }

  for (const fnCfg of fns) {
    const { callback, promiseResolved, data } = fnCfg;
    const result: Record<string, any> = {};
    data.map((code) => {
      if (mergeRes[code]) {
        result[code] = mergeRes[code];
      }
    }, {});
    callback(result);
    promiseResolved(result);
  }
};

第五步:实战演示


import { timeSlicedScheduler } from '@/utils/batchScheduler.ts'

// 方式一:callback回调
timeSlicedScheduler<string>({
  // 唯一命名
  schedulerName: 'test1',
  // 操作内容,主要是key
  keys: ['1', '2'],
  // 将结果映射成map,方便后续处理(缓存,去重等)
  async buildMappingTable(params) {
    const data = await API({ params });
    const res: Record<string, string> = {}
    data.forEach(d => {
      res[d.key] = d.value;
    })
    return res;
  },
  // 回调
  callback(res: Record<string, string>) {
    // do something
  }, {
        // 参数配置(可选参数)
    interval: 100,   // 收集间隔
        cache: false,    // 是否缓存
        cacheTTL: 0,     // 缓存过期时间
        biz: 'hk4e',     // 业务标识符
  }
});

// 方式二:promise
const res: Record<string, string> = await timeSlicedScheduler<string>({
  schedulerName: 'test1',
  keys: ['1', '2'],
  async buildMappingTable(params) {
    const data = await API({ params });
    const res: Record<string, string> = {}
    data.forEach(d => {
      res[d.key] = d.value;
    })
    return res;
  }
});