时间分片调度器
本文是针对大数据表单元格批量请求优化方案一文中提及的“请求队列”的扩充实现
第一步:实现一个定时器
笔者考虑使用 setInterval 进行定时触发,但遇到一个问题,当页面渲染较多元素时,渲染线程占满了浏览器,导致脚本被延迟触发,定时间隔不稳定。因此,考虑在 worker 线程中执行 setInterval 定时器。
控制器接受两个操作,start和stop,对应开始计时和停止计时,开始后通过run事件回调给主线程。
const schedulerMap: Record<string, any> = {};
addEventListener('message', (e: MessageEvent) => {
const { id, method, interval = 100 } = e.data;
if (method === 'start') {
if (!schedulerMap[id]) {
schedulerMap[id] = setInterval(() => {
postMessage({
id,
method: 'run',
});
}, interval);
}
} else if (method === 'stop') {
clearInterval(schedulerMap[id]);
delete schedulerMap[id];
}
});
第二步:在主线程中监听worker事件
const worker = new BatchSchedulerWorker();
worker.addEventListener('message', (e) => {
const { id, method } = e.data;
if (method === 'run') {
_batchGetDataCallback(id);
}
});
第三步:实现一个数据收集模块
首先需要定义相关参数:
interface ISchedulerProps<T, K> {
// 全局唯一命名
schedulerName: string;
// 批量处理的内容
keys: K[];
// 构建映射表
buildMappingTable(params: K[]): Promise<Record<string, T>>;
// 成功回调
callback?: (params: Record<string, T>) => void;
// 可选属性
options?: {
// 收集间隔
interval?: number;
// 是否开启缓存
cache?: boolean;
// 缓存过期时间
cacheTTL?: number;
};
}
// 模块定义
export const timeSlicedScheduler = <T, K = string>(
params: ISchedulerProps<T, K>,
): Promise<Record<string, K>> => {}
这里需要同时实现两种调用方式:Promise调用和Callback调用,方便外部按需取用。
首先进行参数解析,赋予默认值:
const {
schedulerName,
keys,
buildMappingTable: handler,
callback = () => {
//
},
options = {
interval: 100,
cache: false,
cacheTTL: LRU_MAX_AGE,
biz: 'hk4e',
},
} = params;
const {
interval = 100,
cache = false,
cacheTTL = LRU_MAX_AGE,
} = options;
const apiKey = `${schedulerName || handler.name || UNKNOWN_API_NAME}`;
然后将数据构造成内部需要的数据格式,并返回一个Promise,供外部使用。
const promise = new Promise<Record<string, T>>((resolved) => {
const cachePrefix = `TIME_${apiKey}`;
const apiCfg = apiConfigMap.get(apiKey);
const uniqData = [...new Set(keys)];
if (apiCfg) {
apiCfg.data.push(...uniqData);
apiCfg.callbacks.push({
callback,
promiseResolved: resolved,
data: uniqData,
});
apiCfg.cache = cache;
apiCfg.cacheTTL = cacheTTL;
} else {
apiConfigMap.set(apiKey, {
data: [...uniqData],
handler,
callbacks: [{ callback, promiseResolved: resolved, data: uniqData }],
cachePrefix,
cache,
cacheTTL,
});
}
worker.postMessage({
method: 'start',
id: apiKey,
interval: interval,
});
});
return promise;
第四步:批量处理回调
- 首先读取是否还有存量数据,如果没有,停止调度触发器
- 如果用户开启缓存,执行以下操作:如果在缓存映射表找到对应键,则直接从缓存中读取数据,同时将该数据键移出请求队列
- 发起请求
- 收集请求数据,并通过外部传入的buildMappingTable处理成key-value形式(方便将各个形态不一的数据结构处理成能缓存的结构)
- 返回数据,同时触发用户传入的Callback和对应的Promise,完成
const _batchGetDataCallback = async (id: string) => {
const apiCfg = apiConfigMap.get(id);
if (!apiCfg) return;
const { data, cache, cachePrefix, cacheTTL, handler } = apiCfg;
const len = data.length;
if (!len) {
worker.postMessage({
method: 'stop',
id,
});
return;
}
const valueSet = new Set(data);
const fns = apiCfg.callbacks.slice(0, len);
apiCfg.data.splice(0, len);
apiCfg.callbacks.splice(0, len);
const cacheRes: Record<string, any> = {};
if (cache) {
[...valueSet].forEach((code) => {
const cacheData = resultCache.get(`${cachePrefix}_${code}`);
if (cacheData) {
cacheRes[code] = cacheData;
valueSet.delete(code);
}
});
}
let mergeRes: Record<string, any> = {};
if (valueSet.size) {
let originRes: Record<string, any>[] = [];
try {
const response = await Promise.allSettled(
chunk([...valueSet], MAX_REQUEST_COUNT).map((codes) => {
return handler(codes);
}),
);
const _data: any[] = [];
response.forEach((item) => {
if (item.status === 'fulfilled') {
const data = item.value;
_data.push(data);
}
});
originRes = _data || [];
} catch (e) {
originRes = [];
}
const fmtRes = originRes.reduce((curr, next) => {
return merge(curr, next);
}, {});
mergeRes = { ...cacheRes, ...fmtRes };
} else {
mergeRes = cacheRes;
}
if (cache) {
Object.keys(mergeRes).forEach((code) => {
const data = mergeRes[code];
if (!data) return;
resultCache.set(`${cachePrefix}_${code}`, cloneDeep(data), {
ttl: cacheTTL,
});
});
}
for (const fnCfg of fns) {
const { callback, promiseResolved, data } = fnCfg;
const result: Record<string, any> = {};
data.map((code) => {
if (mergeRes[code]) {
result[code] = mergeRes[code];
}
}, {});
callback(result);
promiseResolved(result);
}
};
第五步:实战演示
import { timeSlicedScheduler } from '@/utils/batchScheduler.ts'
// 方式一:callback回调
timeSlicedScheduler<string>({
// 唯一命名
schedulerName: 'test1',
// 操作内容,主要是key
keys: ['1', '2'],
// 将结果映射成map,方便后续处理(缓存,去重等)
async buildMappingTable(params) {
const data = await API({ params });
const res: Record<string, string> = {}
data.forEach(d => {
res[d.key] = d.value;
})
return res;
},
// 回调
callback(res: Record<string, string>) {
// do something
}, {
// 参数配置(可选参数)
interval: 100, // 收集间隔
cache: false, // 是否缓存
cacheTTL: 0, // 缓存过期时间
biz: 'hk4e', // 业务标识符
}
});
// 方式二:promise
const res: Record<string, string> = await timeSlicedScheduler<string>({
schedulerName: 'test1',
keys: ['1', '2'],
async buildMappingTable(params) {
const data = await API({ params });
const res: Record<string, string> = {}
data.forEach(d => {
res[d.key] = d.value;
})
return res;
}
});