diff --git a/docs/openaf-advanced.md b/docs/openaf-advanced.md index ee4b2893..df8fb47f 100644 --- a/docs/openaf-advanced.md +++ b/docs/openaf-advanced.md @@ -102,9 +102,10 @@ Favor built-in job shortcuts `(if)`, `(repeat)`, `(each)`, `(parallel)` to minim ## 14. Asynchronous Execution with oPromise -- **$do** – queue work on the standard ForkJoin-backed pool and receive an `oPromise` for fluent `.then` / `.catch` composition. The resolver passed into your function can resolve with returned values or explicit `resolve()` calls, while thrown errors or `reject()` calls route to the rejection chain.【F:js/openaf.js†L13130-L13157】【F:js/openaf.js†L12208-L12251】 -- **$doV** – same contract as `$do` but targets a virtual-thread-per-task executor so launching many concurrent tasks will not consume native threads when the JVM supports Project Loom virtual threads.【F:js/openaf.js†L12145-L12163】【F:js/openaf.js†L13148-L13157】 -- **Coordination helpers** – mix `$doAll` / `$doFirst` (wrappers over `oPromise.all()` / `.race()`) to wait for all tasks or the first completion, enabling fan-out/fan-in patterns without manual synchronization primitives.【F:js/openaf.js†L13159-L13179】【F:js/openaf.js†L12253-L12348】 +- **$do** – queue work on the standard ForkJoin-backed pool and receive an `oPromise` for fluent `.then` / `.catch` composition. The resolver passed into your function can resolve with returned values or explicit `resolve()` calls, while thrown errors or `reject()` calls route to the rejection chain.【F:js/openaf.js†L13426-L13455】【F:js/openaf.js†L12472-L12528】 +- **$doV** – same contract as `$do` but targets a virtual-thread-per-task executor so launching many concurrent tasks will not consume native threads when the JVM supports Project Loom virtual threads.【F:js/openaf.js†L13444-L13455】【F:js/openaf.js†L12421-L12438】 +- **Coordination helpers** – mix `$doAll` / `$doFirst` (wrappers over `oPromise.all()` / `.race()`) to wait for all tasks or the first completion, enabling fan-out/fan-in patterns without manual synchronization primitives.【F:js/openaf.js†L13459-L13479】【F:js/openaf.js†L12532-L12589】 +- **Cancellation** – call `.cancel()` on any `$do` / `$doV` promise to interrupt the associated thread (mirroring the Threads plugin) and drive the chain into the rejection path for cleanup.【F:js/openaf.js†L13426-L13455】【F:js/openaf.js†L12664-L12683】 Example fan-out flow using virtual threads: diff --git a/js/openaf.js b/js/openaf.js index 3889c028..b4ce972f 100644 --- a/js/openaf.js +++ b/js/openaf.js @@ -12481,15 +12481,16 @@ const __getThreadPools = function() { * Optionally if useVirtualThreads is true, the aFunction will be executed in a virtual thread, otherwise it will be executed in a normal thread. * */ - const oPromise = function(aFunction, aRejFunction, useVirtualThreads) { - this.states = { - NEW: 0, FULFILLED: 1, PREFAILED: 2, FAILED: 3 - }; - - this.state = $atomic(this.states.NEW, "int"); - this.executing = $atomic(false, "boolean"); - this.executors = new java.util.concurrent.ConcurrentLinkedQueue(); - this.vThreads = useVirtualThreads || false +const oPromise = function(aFunction, aRejFunction, useVirtualThreads) { + this.states = { + NEW: 0, FULFILLED: 1, PREFAILED: 2, FAILED: 3 + }; + + this.state = $atomic(this.states.NEW, "int"); + this.executing = $atomic(false, "boolean"); + this.executors = new java.util.concurrent.ConcurrentLinkedQueue(); + this.vThreads = useVirtualThreads || false + this.__thread = __ this.then(aFunction, aRejFunction); }; @@ -12660,92 +12661,110 @@ oPromise.prototype.resolve = function(aValue) { return this; }; -oPromise.prototype.cancel = function() { - if (isDef(this.__f)) { - return this.__f.cancel(true); - } -}; +oPromise.prototype.cancel = function(aReason) { + if (this.state.get() == this.states.FULFILLED || this.state.get() == this.states.FAILED) return false + + this.reason = isDef(aReason) ? aReason : "cancelled" + this.state.set(this.states.PREFAILED) + + var cancelled = false + + try { + if (isDef(this.__f)) cancelled = this.__f.cancel(true) + } catch(e) {} + + try { + if (isDef(this.__thread) && this.__thread.isAlive()) { this.__thread.interrupt(); cancelled = true } + } catch(e) {} + + this.__exec() + + return cancelled +} oPromise.prototype.__exec = function() { var thisOP = this; - do { - try { - this.__f = __getThreadPool(this.vThreads).submit(new java.lang.Runnable({ - run: () => { - //var ignore = false; - //syncFn(() => { if (thisOP.executing.get()) ignore = true; else thisOP.executing.set(true); }, thisOP.executing.get()); - if (!thisOP.executing.setIf(false, true)) return - //if (ignore) return; - - try { - while (thisOP.executors.size() > 0) { - var f = thisOP.executors.poll(); - // Exec - if (thisOP.state.get() != thisOP.states.PREFAILED && - thisOP.state.get() != thisOP.states.FAILED && - f != null && isDef(f) && f.type == "exec" && isDef(f.func) && isFunction(f.func)) { - var res, done = false; - try { - var checkResult = $atomic(true, "boolean"); - if (isDef(thisOP.value)) { - res = f.func(thisOP.value); - } else { - res = f.func(function (v) { checkResult.set(false); thisOP.resolve(v); }, - function (r) { checkResult.set(false); thisOP.reject(r); }); - } - - if (checkResult.get() && - (isJavaObject(res) || isDef(res)) && - res != null && - (thisOP.state.get() == thisOP.states.NEW || thisOP.state.get() == thisOP.states.FULFILLED)) { - res = thisOP.resolve(res); - } - } catch (e) { - thisOP.reject(e); - } - } - // Reject - if (thisOP.state.get() == thisOP.states.PREFAILED || thisOP.state.get() == thisOP.states.FAILED) { - while (f != null && isDef(f) && f.type != "reject" && isDef(f.func) && isFunction(f.func)) { - f = thisOP.executors.poll(); - } + do { + try { + this.__f = __getThreadPool(this.vThreads).submit(new java.lang.Runnable({ + run: () => { + //var ignore = false; + //syncFn(() => { if (thisOP.executing.get()) ignore = true; else thisOP.executing.set(true); }, thisOP.executing.get()); + if (!thisOP.executing.setIf(false, true)) return + //if (ignore) return; - if (f != null && isDef(f) && isDef(f.func) && isFunction(f.func)) { - try { - f.func(thisOP.reason); - thisOP.state.set(thisOP.states.FULFILLED); - } catch (e) { - thisOP.state.set(thisOP.states.FAILED); - throw e; - } - } else { - if (isUnDef(f) || f == null) thisOP.state.set(thisOP.states.FAILED); - } - } - } - } catch(ee) { - throw ee; - } finally { - //syncFn(() => { thisOP.executing.set(false); }, thisOP.executing.get()); - thisOP.executing.set(false) + thisOP.__thread = java.lang.Thread.currentThread() - if (thisOP.executors.isEmpty()) { - thisOP.state.setIf(thisOP.states.NEW, thisOP.states.FULFILLED); - thisOP.state.setIf(thisOP.states.PREFAILED, thisOP.states.FAILED); - } - } + try { + while (thisOP.executors.size() > 0) { + var f = thisOP.executors.poll(); + // Exec + if (thisOP.state.get() != thisOP.states.PREFAILED && + thisOP.state.get() != thisOP.states.FAILED && + f != null && isDef(f) && f.type == "exec" && isDef(f.func) && isFunction(f.func)) { + var res, done = false; + try { + var checkResult = $atomic(true, "boolean"); + if (isDef(thisOP.value)) { + res = f.func(thisOP.value); + } else { + res = f.func(function (v) { checkResult.set(false); thisOP.resolve(v); }, + function (r) { checkResult.set(false); thisOP.reject(r); }); + } + + if (checkResult.get() && + (isJavaObject(res) || isDef(res)) && + res != null && + (thisOP.state.get() == thisOP.states.NEW || thisOP.state.get() == thisOP.states.FULFILLED)) { + res = thisOP.resolve(res); + } + } catch (e) { + thisOP.reject(e); + } + } + // Reject + if (thisOP.state.get() == thisOP.states.PREFAILED || thisOP.state.get() == thisOP.states.FAILED) { + while (f != null && isDef(f) && f.type != "reject" && isDef(f.func) && isFunction(f.func)) { + f = thisOP.executors.poll(); + } + + if (f != null && isDef(f) && isDef(f.func) && isFunction(f.func)) { + try { + f.func(thisOP.reason); + thisOP.state.set(thisOP.states.FULFILLED); + } catch (e) { + thisOP.state.set(thisOP.states.FAILED); + throw e; + } + } else { + if (isUnDef(f) || f == null) thisOP.state.set(thisOP.states.FAILED); + } + } + } + } catch(ee) { + throw ee; + } finally { + //syncFn(() => { thisOP.executing.set(false); }, thisOP.executing.get()); + thisOP.executing.set(false) + thisOP.__thread = __ + + if (thisOP.executors.isEmpty()) { + thisOP.state.setIf(thisOP.states.NEW, thisOP.states.FULFILLED); + thisOP.state.setIf(thisOP.states.PREFAILED, thisOP.states.FAILED); + } + } - /*if (thisOP.state == thisOP.states.PREFAILED && thisOP.executors.isEmpty()) { - thisOP.state = thisOP.states.FAILED; - }*/ - } - })); - } catch(e) { - if (String(e).indexOf("RejectedExecutionException") < 0) throw e; - } - // Try again if null - } while(isUnDef(this.__f) || this.__f == null); + /*if (thisOP.state == thisOP.states.PREFAILED && thisOP.executors.isEmpty()) { + thisOP.state = thisOP.states.FAILED; + }*/ + } + })); + } catch(e) { + if (String(e).indexOf("RejectedExecutionException") < 0) throw e; + } + // Try again if null + } while(isUnDef(this.__f) || this.__f == null); }; @@ -13410,10 +13429,11 @@ const $doA2B = function(aAFn, aBFn, noc, defaultTimeout, aErrorFunction, useVirt * object will be immediatelly returned. Optionally this aFunction can receive a resolve and reject functions for to you use inside * aFunction to provide a result with resolve(aResult) or an exception with reject(aReason). If you don't call theses functions the * returned value will be used for resolve or any exception thrown will be use for reject. You can use the "then" method to add more - * aFunction that will execute once the previous as executed successfully (in a stack fashion). The return/resolve value from the + * aFunction that will execute once the previous as executed successfully (in a stack fashion). The return/resolve value from the * previous function will be passed as the value for the second. You can use the "catch" method to add aFunction that will receive * a string or exception for any exception thrown with the reject functions. You can also provide aRejFunction to work as a "catch" - * method as previously described before. + * method as previously described before. The returned oPromise also exposes a cancel method to interrupt the corresponding running + * thread (similar to the Threads plugin capabilities) and reject the promise chain. * */ const $do = function(aFunction, aRejFunction) { @@ -13425,7 +13445,9 @@ const $do = function(aFunction, aRejFunction) { * $doV(aFunction, aRejFunction) : oPromise * Equivalent to $do but using virtual threads (if available) to execute the aFunction. This will allow the aFunction to be executed * without blocking the current thread. If aFunction is not provided it will return a new oPromise that will be resolved - * when the first executor is added to it. If aRejFunction is provided it will be used as a "catch" method for the oPromise. + * when the first executor is added to it. If aRejFunction is provided it will be used as a "catch" method for the oPromise. The + * returned oPromise also exposes a cancel method to interrupt the corresponding running thread (similar to the Threads plugin + * capabilities) and reject the promise chain. * */ const $doV = function(aFunction, aRejFunction) { diff --git a/tests/autoTestAll.Generic.js b/tests/autoTestAll.Generic.js index 7a2d3103..968bb38f 100644 --- a/tests/autoTestAll.Generic.js +++ b/tests/autoTestAll.Generic.js @@ -659,7 +659,61 @@ ow.test.assert(res, false, "Problem with multiple $do().then().catch()"); }; - + + exports.testDoCancel = function() { + var interrupted = $atomic(false, "boolean") + var cancelled = $atomic(false, "boolean") + var start = now() + + var p = $do(() => { + while(true) { + sleep(1000, true) + if ((now() - start) > 5000) throw "timeout" + } + }).catch((reason) => { + if (String(reason) == "cancelled" || String(reason).toLowerCase().indexOf("interrupted") >= 0) { + cancelled.set(true) + interrupted.set(true) + } + $await("testDoCancel").notify() + }) + + $doWait(p, 1500) + var cRes = p.cancel("cancelled") + $await("testDoCancel").wait(2000) + + ow.test.assert(cRes, true, "Problem cancelling $do"); + ow.test.assert(interrupted.get(), true, "Problem interrupting $do thread") + ow.test.assert(cancelled.get(), true, "Problem propagating $do cancellation") + }; + + exports.testDoVCancel = function() { + var interrupted = $atomic(false, "boolean"); + var cancelled = $atomic(false, "boolean"); + var start = now(); + + var p = $doV(() => { + while(true) { + sleep(1000, true) + if ((now() - start) > 5000) throw "timeout" + } + }).catch((reason) => { + if (String(reason) == "cancelled" || String(reason).toLowerCase().indexOf("interrupted") >= 0) { + cancelled.set(true) + interrupted.set(true) + } + $await("testDoVCancel").notify() + }) + + $doWait(p, 1500) + var cRes = p.cancel("cancelled") + $await("testDoVCancel").wait(2000) + + ow.test.assert(cRes, true, "Problem cancelling $doV"); + ow.test.assert(interrupted.get(), true, "Problem interrupting $doV thread"); + ow.test.assert(cancelled.get(), true, "Problem propagating $doV cancellation"); + }; + exports.testDoAll = function() { var success = []; diff --git a/tests/autoTestAll.Generic.yaml b/tests/autoTestAll.Generic.yaml index 2479546f..1bb23c7a 100644 --- a/tests/autoTestAll.Generic.yaml +++ b/tests/autoTestAll.Generic.yaml @@ -110,15 +110,25 @@ jobs: to : oJob Test exec: args.func = args.tests.testObjectCompression; - - name: OpenAF::Test $do - from: OpenAF::Init - then: oJob Test - exec: args.func = args.tests.testDo; - - - name: OpenAF::Test $doAll - from: OpenAF::Init - then: oJob Test - exec: args.func = args.tests.testDoAll; + - name: OpenAF::Test $do + from: OpenAF::Init + then: oJob Test + exec: args.func = args.tests.testDo; + + - name: OpenAF::Test $do cancel + from: OpenAF::Init + then: oJob Test + exec: args.func = args.tests.testDoCancel; + + - name: OpenAF::Test $doV cancel + from: OpenAF::Init + then: oJob Test + exec: args.func = args.tests.testDoVCancel; + + - name: OpenAF::Test $doAll + from: OpenAF::Init + then: oJob Test + exec: args.func = args.tests.testDoAll; - name: OpenAF::Test $doFirst from: OpenAF::Init