假设有 10 个请求,但是最大的并发数目是 5 个,并且要求拿到请求结果,这样就是一个简单的并发请求控制
利用 setTimeout 实行简单模仿一个请求
let startTime = Date.now(); const timeout = (timeout: number, ret: number) => { return (idx?: any) => new Promise((resolve) => { setTimeout(() => { const compare = Date.now() - startTime; console.log(`At ${Math.floor(compare / 100)}00 return`, ret); resolve(idx); }, timeout); }); }; const timeout1 = timeout(1000, 1); const timeout2 = timeout(300, 2); const timeout3 = timeout(400, 3); const timeout4 = timeout(500, 4); const timeout5 = timeout(200, 5);
通过这样来模拟请求,本质就是 Promise
const run = async () => { startTime = Date.now(); await Promise.all([ timeout1(), timeout2(), timeout3(), timeout4(), timeout5(), ]); }; run(); At 200 return 5 At 300 return 2 At 400 return 3 At 500 return 4 At 1000 return 1
可以看到输出是 5 2 3 4 1 ,按 timeout 的时间输出了
假设同时间最大并发数目是 2,创建一个类
class Concurrent { private maxConcurrent: number = 2; constructor(count: number = 2) { this.maxConcurrent = count; } }
想一下,按最大并发数拆分 Promise 数组,如果有 Promise 被 fulfilled 的时候,就移除掉,然后把 pending 状态的 Promise ,加进来。Promise.race 可以帮我们满足这个需求
class Concurrent { private maxConcurrent: number = 2; constructor(count: number = 2) { this.maxConcurrent = count; } public async useRace(fns: Function[]) { const runing: any[] = []; // 按并发数,把 Promise 加进去 // Promise 会回调一个索引,方便我们知道哪个 Promise 已经 resolve 了 for (let i = 0; i < this.maxConcurrent; i++) { if (fns.length) { const fn = fns.shift()!; runing.push(fn(i)); } } const handle = async () => { if (fns.length) { const idx = await Promise.race<number>(runing); const nextFn = fns.shift()!; // 移除已经完成的 Promise,把新的进去 runing.splice(idx, 1, nextFn(idx)); handle(); } else { // 如果数组已经被清空了,表面已经没有需要执行的 Promise 了,可以改成 Promise.all await Promise.all(runing); } }; handle(); } } const run = async () => { const concurrent = new Concurrent(); startTime = Date.now(); await concurrent.useRace([timeout1, timeout2, timeout3, timeout4, timeout5]); }; At 300 return 2 At 700 return 3 At 1000 return 1 At 1200 return 5 At 1200 return 4
可以看到输出已经变了,为什么会这样呢,分析一下,最大并发数 2
// 首先执行的是 1 2
1 需要 1000 MS 才执行完
2 需要 300 MS2 执行完,时间线变成 300 移除 2 加入 3 开始执行 3
3 需要 400MS 执行完时间变成 700 移除 3 加入 4 开始执行 4
4 需要 500MS
时间线来到 1000MS,1 执行完 移除 1 加入 5 开始执行 5
时间线来到 1200MS,4 和 5 刚好同时执行完
可以利用 await 的机制,其实也是一个小技巧
await 表达式会暂停当前 async function 的执行,等待 Promise 处理完成。若 Promise 正常处理(fulfilled),其回调的 resolve 函数参数作为 await 表达式的值,继续执行 async function。
如果当前的并发数已经超过最大的并发数目了,可以设置一个新的 Promise,并且 await,等待其他的请求完成的时候,resolve,移除等待,所以需要新增两个状态,当前的并发数目,还有用来存储 resolve 这个回调函数的数组
class Concurrent { private maxConcurrent: number = 2; private list: Function[] = []; private currentCount: number = 0; constructor(count: number = 2) { this.maxConcurrent = count; } public async add(fn: Function) { this.currentCount += 1; // 如果最大已经超过最大并发数 if (this.currentCount > this.maxConcurrent) { // wait 是一个 Promise,只要调用 resolve 就会变成 fulfilled 状态 const wait = new Promise((resolve) => { this.list.push(resolve); }); // 在没有调用 resolve 的时候,这里会一直阻塞 await wait; } // 执行函数 await fn(); this.currentCount -= 1; if (this.list.length) { // 把 resolve 拿出来,调用,这样 wait 就完成了,可以往下面执行了 const resolveHandler = this.list.shift()!; resolveHandler(); } } } const run = async () => { const concurrent = new Concurrent(); startTime = Date.now(); concurrent.add(timeout1); concurrent.add(timeout2); concurrent.add(timeout3); concurrent.add(timeout4); concurrent.add(timeout5); }; run(); At 300 return 2 At 700 return 3 At 1000 return 1 At 1200 return 5 At 1200 return 4
这两种方式都可以实现并发控制,只不过实现的方式不太一样,主要都是靠 Promise 实现,另外实现方式里面没有考虑异常的情况,这个可以自己加上
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:mmqy2019@163.com进行举报,并提供相关证据,查实之后,将立刻删除涉嫌侵权内容。
长按识别二维码并关注微信
更方便到期提醒、手机管理