Skip to content

Commit 6c14492

Browse files
committed
Semaphore.js modified
1 parent 225b44c commit 6c14492

File tree

1 file changed

+100
-59
lines changed

1 file changed

+100
-59
lines changed

Semaphore.js

Lines changed: 100 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
'use strict';
1+
'use strict'; // this is for ES13 aka ES2022
2+
23
/*S*/ const CONSOLE = this.CONSOLE || ((...a) => { console.log(...a) }); // shall return void 0
34
/*S*/ const _FPC = Function.prototype.call;
45
/*S*/ const DONOTHING = function(){} // The "do nothing" DUMMY function
@@ -10,6 +11,7 @@
1011
/*S*/ const PE = Promise.reject(); PE.catch(DONOTHING); // Promise.reject() but ignore when .catch() is missing
1112
/*S*/ const PO = () => { const o={}; o.p = new Promise((ok,ko) => { o.ok=ok; o.ko=ko }); return o } // PromiseObject
1213
//S//
14+
1315
// Examples:
1416
// for (let i=0; ++i<1000000; ) fetch(`http://example.com/?${i}`); // crashes the Tab
1517
// const fetch50 = Semaphore(50, fetch); // repair
@@ -21,7 +23,7 @@
2123
'waiting was'
2224
);
2325
for (let i=100; --i>0; x(Math.random()*10000|0));
24-
await x.Wait();
26+
await x.Idle();
2527
*/
2628
/*
2729
const sem = Semaphore(1);
@@ -42,7 +44,7 @@
4244
console.log('main');
4345
*/
4446
// Semaphore() returns with following properties:
45-
// .max parameter 1 passed to Semaphore (your chosen max value or function).
47+
// .max parameter 1 passed to Semaphore (your chosen max value or function).
4648
// .fn parameter 2 passed to Semaphore (the function protected by Semaphore)
4749
// .args parameter 3+ passed to Semaphore (the first arguments to .fn)
4850
// .count number of currently running calls
@@ -59,17 +61,25 @@
5961
// .cancel() cancels all
6062
// .cancel(+N) cancels the first N on the waiting list
6163
// .cancel(-N) cancels the last N on the waiting list
62-
// .try() Same as Acquire(), but synchronous. Hence it fails if no Semaphore available (or max() returns a Promise or throws)
64+
// .try() Same as Acquire(), but synchronous. Hence it fails if no Semaphore available (or .max is a function which returns a Promise or throws)
65+
// .acquire() same as .try(). Note that .try() returns void 0 if nothing can be aquired
6366
// .Acquire() acquires 1. returns a Promise which resolves to the "release()" function.
6467
// .Acquire(0) acquires all free (at least 1).
68+
// .Acquire(N,..) and .acquire(N,..) call .max(N,..) if .max is a function
6569
// release() or release(void 0) releases all Acquired, release(1) only releases 1. Throws if "overreleased"
66-
// .Wait() same as .Wait(0): waits for all releases (Semaphore is idle)
67-
// .Wait(N) wait for N Releases. .Wait(-N) returns immediately if less than N is running, else waits for next Release.
70+
// .Idle() same as .Max()
71+
// .free(N) returns the number of currently free slots (==N if N given). 0 if nothing is free (or N cannot be satisfied). Throws if unsatisfyable
72+
// .Free(N) same as .free() but asynchronous. .Free()/.free() work like .Acquire()/.acquire(), but does not return a release() function
73+
// .WaitN(N) wait for N Releases. .WaitN(0) returns immediately if nothing is running, else waits for 1 Release
6874
// .Max(N) wait until .count is not more than N, .Max() is .Max(0)
6975
// .Min(N) wait until .count is at least N, .Min() is .Min(0)
7076
// Min(0) waits until something happens on the Semaphore
7177
// Min(-1) waits until a Release or Acquire
7278
// Min(-2) waits until an Acquire
79+
// .Waiting(N) wait until .wait <= N
80+
//
81+
// .fifo() switches in FiFo-queuing-strategy (first in, first out), default
82+
// .lifo() switches in LiFo-queuing-strategy (last in, first out)
7383
//
7484
// If .max is a function, it is called with the Semaphore (and optional .Acquire() args) can dynamically return how much work to do in parallel.
7585
// If it returns a Promise, execution halts until the Promise resolves. If it rejects or .max() throws, this is as if it returns 1
@@ -80,15 +90,15 @@
8090
const Semaphore = (max, fn, ...args) =>
8191
{
8292
const D = DD('Semaphore');
83-
let run = 0, wait = 0;
93+
let run = 0;
8494
let maxing; // set while run.max() is awaited
8595
let waiting, cntwait;
8696
const waits = [];
8797
const upd = n =>
8898
{
8999
n = n|0;
90100
ret.count = run += n;
91-
ret.wait = waits.length + wait;
101+
ret.wait = waits.length + (waiting?.count|0);
92102
if (n<0 && waiting)
93103
{
94104
const ok = waiting.ok;
@@ -114,22 +124,28 @@ const Semaphore = (max, fn, ...args) =>
114124
x[0](x[2]);
115125
}
116126
}
127+
// XXX TODO XXX
128+
// We should call .max() only once (with the same parameters)
129+
// and cache the result until something changes on the Semaphore.
130+
// This also could improve the non-async case in case the async part already has finished.
131+
// (But this perhaps creates some nondeterministic looking behavior on the non-async calls.)
132+
// !! Be prepared that .max() function is only called on changes in future !!
117133
const get = (...a) =>
118134
{
119135
D('get', a);
120136
upd();
121137
try {
122138
return isFunction(ret.max) ? ret.max(ret, ...a) : ret.max;
123139
} catch (e) {
124-
return PE;
140+
return PE; // This is an internal function, so do not call global error handler in case we are rejected
125141
}
126142
}
127143
const next = _ =>
128144
{
129145
D('next', _);
130146
if (maxing) return upd();
131147
const limit = get();
132-
if (limit && limit.then)
148+
if (limit?.then)
133149
maxing = Promise.resolve(limit).then(check, _=>check(1)); // in case of max() failing, we just ensure one semaphore runs so this is called again
134150
else
135151
check(limit);
@@ -153,10 +169,10 @@ const Semaphore = (max, fn, ...args) =>
153169
function release(k)
154170
{
155171
D('release', k);
156-
if (k===void 0 && !(k=n)) THROW(`release(): already fully released`);
172+
if (k===void 0 && !(k=n)) THROW(`Semaphore.release(): already fully released`);
157173
k = k|0;
158-
if (k<0) THROW(`release(${k}): negative`);
159-
if (n<k) THROW(`release(${k}): too high (max ${n})`);
174+
if (k<0) THROW(`Semaphore.release(${k}): negative`);
175+
if (n<k) THROW(`Semaphore.release(${k}): too high (max ${n})`);
160176
release.left = n -= k;
161177
upd(-k);
162178
return release;
@@ -168,115 +184,140 @@ const Semaphore = (max, fn, ...args) =>
168184
release.run = (fn, ...args) => { CATCH$$(fn, release(0), args); return release(0) }
169185
return release(0);
170186
}
171-
const acquire = (N,...a) => // acquire('1') works. acquire('0') throws!
187+
const free = (N,...a) => // .free('1') works. .free('0') throws! This is intended
172188
{
173189
D('try', N,a);
174190
// if (maxing) return; // max is already resolving -> nope, perhaps max() behaves differently here
175191
let n = N === void 0 ? 1 : N|0; // This works for classes with toString() returning nonnull integer
176-
if (!n && N!==0) THROW(`.acquire(${N}): nonnumeric`);
177-
if (n<0) THROW('.aquire(${n}): negative');
192+
if (!n && N!==0) THROW(`Semaphore: nonnumeric paramter ${N}`);
193+
if (n<0) THROW(`Semaphore: negative parameter ${n}`);
178194

179195
let limit = get(N,...a); // passing N, not n
180-
if (limit && limit.then) THROW(`cannot use async .max() in non-async .acquire()`);
196+
if (limit?.then) THROW(`Semaphore: cannot use async .max() in non-async call`);
181197
limit = limit|0;
182198

183199
if (!n)
184200
{
185-
if (limit<=0) THROW(`.acquire(${n}): unlimited (.max is ${limit})`);
201+
if (limit<=0) THROW(`Semaphore: unlimited (.max is ${limit})`);
186202
n = limit-run;
187-
if (n<1) return; // Too much
203+
if (n<1) return 0; // Nothing free
188204
}
189205
else if (limit>0)
190206
{
191-
if (n>limit) THROW(`.acquire(${n}): unsatisfyable (max is ${limit})`);
192-
if (run+n>limit) return; // Too much
207+
if (n>limit) THROW(`Semaphore: unsatisfyable ${n} (.max is ${limit})`);
208+
if (run+n>limit) return 0; // Not enough free
193209
}
194-
return release_function(n);
195-
}
196-
const step = () =>
197-
{
198-
if (!cntwait)
199-
cntwait = PO();
200-
return cntwait.p;
210+
return n;
201211
}
202-
const work = () =>
212+
const acquire = (...a) => { const n = free(...a); return n ? release_function(n) : void 0 }
213+
214+
const Waiting = async N =>
203215
{
204-
if (!waiting)
205-
waiting = PO();
206-
waiting.p.finally(() => wait--); // this to work correctly needs a sane implementation of Promise()
207-
wait++;
208-
return waiting.p;
216+
N = N|0;
217+
while (ret.wait>N)
218+
{
219+
if (!cntwait)
220+
cntwait = PO();
221+
await cntwait.p;
222+
}
223+
return ret;
209224
}
210225
const Max = async N =>
211226
{
212227
N = N|0;
213-
while (run>N)
214-
await step();
228+
while (ret.count>N)
229+
{
230+
if (!cntwait)
231+
cntwait = PO();
232+
await cntwait.p;
233+
}
215234
return ret;
216235
}
217236
const Min = async N =>
218237
{
219238
N = N|0;
220-
if (N<=0 || run<N)
239+
if (N<=0 || ret.count<N)
221240
do
222241
{
223-
const n = await step();
242+
if (!cntwait)
243+
cntwait = PO();
244+
const n = await cntwait.p;
224245
if (N<0 && !n || N<-1 && n<0)
225246
continue;
226-
} while (run<N);
247+
} while (ret.count<N);
227248
return ret;
228249
}
229-
const Wait = async N =>
250+
const WaitN = async N =>
230251
{
231252
N = N|0;
232-
if (!N)
233-
while (run || wait || waits.length)
234-
await work();
235-
else if (N>0 || run>=-N)
236-
while ((await work())>=0 || --N>0);
253+
if (N<=0 && !ret.count) return ret;
254+
do
255+
{
256+
if (!waiting)
257+
waiting = PO();
258+
if ((await waiting.p)>=0)
259+
continue;
260+
} while (--N>0);
237261
return ret;
238262
}
239263

240-
// Sadly I found no good way to reuse things here
264+
// Sadly I found no good way to reuse things (.free) here
241265
// XXX TODO XXX implement with Revocable above!
242-
const Acquire = async (N,...a) =>
266+
const Free = async (N,...a) =>
243267
{
244-
D('Acquire', N,a);
268+
D('Free', N,a);
245269
let n = N === void 0 ? 1 : N|0; // This works for classes with toString() returning nonnull integer
246-
if (!n && N!==0) THROW(`.Acquire(${N}): nonnumeric`);
247-
if (n<0) THROW('.Aquire(${n}): negative');
270+
if (!n && N!==0) THROW(`Semaphore: nonnumeric paramter ${N}`);
271+
if (n<0) THROW(`Semaphore: negative parameter ${n}`);
248272

249273
for (;;)
250274
{
251275
const limit = (await get(N,...a))|0; // passing N, not n
252-
if (!n && limit<=0) THROW(`.Acquire(${n}): unlimited (.max is ${limit})`);
253-
if (n && limit<n) THROW(`.Acquire(${n}): unsatisfyable (.max is ${limit})`);
276+
if (!n && limit<=0) THROW(`Semaphore: unlimited (.max is ${limit})`);
277+
if ( n && limit< n) THROW(`Semaphore: unsatisfyable ${n} (.max is ${limit})`);
254278

255279
if (run < limit && run+n <= limit)
256-
return release_function(n ? n : limit-run);
280+
return n ? n : limit-run;
257281

282+
if (!waiting)
283+
waiting = PO();
284+
waiting.count = (waiting.count|0) + 1; // Either .Acquire() or .Free() are waiting, too, so increase .wait()
258285
upd();
259-
D('Acquire', 'wait');
260-
await work();
261-
D('Acquire', 'cont');
286+
D('Free', 'wait');
287+
await waiting.p;
288+
D('Free', 'cont');
262289
}
263290
}
264-
const ret = (..._) => next(new Promise((ok,ko) => waits.push([ok,ko,_])).then(() => (ret.fn ? ret.fn : (...a)=>a)(...ret.args,..._)).finally(() => next(upd(-1))));
291+
const Acquire = async (...a) => release_function(await Free(...a));
292+
293+
let discipline = 'push';
294+
295+
const ret = (..._) => next(new Promise((ok,ko) => waits[discipline]([ok,ko,_])).then(() => (ret.fn ? ret.fn : (...a)=>a)(...ret.args,..._)).finally(() => next(upd(-1))));
296+
ret.lifo = () => { discipline='unshift'; return ret; };
297+
ret.fifo = () => { discipline='push'; return ret; };
265298
ret.max = max;
266299
ret.fn = fn;
267300
ret.args = args;
268301
ret.cancel = cancel;
269302
ret.stop = () => { maxing = true; return ret };
270303
ret.start = () => { if (maxing===true) maxing=false; return next(ret) }
271304
ret.try = acquire;
305+
ret.acquire = acquire;
272306
ret.Acquire = Acquire;
307+
ret.free = free;
308+
ret.Free = Free;
273309
ret.Max = Max; // wait for max running
310+
ret.Idle = Max; // wait for Semaphore being idle, convenience
274311
ret.Min = Min; // wait for N started
275-
ret.Wait = Wait; // wait for N releases
312+
ret.WaitN = WaitN; // wait for N releases
313+
ret.Wait = _ => { console.debug('Semaphore.Wait() deprecated, use Semaphore.WaitN()'); return WaitN(_) }
314+
ret.Waiting = Waiting; // wait until N or less are waiting
276315
// ret.release = release; // I really have no good idea how to implement this the sane way in an async world
277-
// XXX TODO XXX await sem.aquire(2) /* not saving return */; ..; sem.release(1); ..; sem.release(1); ..; sem.release(1) ==> throws
316+
// XXX TODO XXX await sem.Aquire(2) /* not saving return */; ..; sem.release(1); ..; sem.release(1); ..; sem.release(1) ==> throws
278317
// XXX TODO XXX .abort() to abort running Promises (if there is some clever way)
279318
return ret;
280319
}
281320
//S//
321+
282322
module.exports = Semaphore;
323+

0 commit comments

Comments
 (0)