Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions docs/openaf-advanced.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,10 @@ Favor built-in job shortcuts `(if)`, `(repeat)`, `(each)`, `(parallel)` to minim

## 14. Asynchronous Execution with oPromise

- **$do** – queue work on the standard ForkJoin-backed pool and receive an `oPromise` for fluent `.then` / `.catch` composition. The resolver passed into your function can resolve with returned values or explicit `resolve()` calls, while thrown errors or `reject()` calls route to the rejection chain.【F:js/openaf.js†L13130-L13157】【F:js/openaf.js†L12208-L12251】
- **$doV** – same contract as `$do` but targets a virtual-thread-per-task executor so launching many concurrent tasks will not consume native threads when the JVM supports Project Loom virtual threads.【F:js/openaf.js†L12145-L12163】【F:js/openaf.js†L13148-L13157】
- **Coordination helpers** – mix `$doAll` / `$doFirst` (wrappers over `oPromise.all()` / `.race()`) to wait for all tasks or the first completion, enabling fan-out/fan-in patterns without manual synchronization primitives.【F:js/openaf.js†L13159-L13179】【F:js/openaf.js†L12253-L12348】
- **$do** – queue work on the standard ForkJoin-backed pool and receive an `oPromise` for fluent `.then` / `.catch` composition. The resolver passed into your function can resolve with returned values or explicit `resolve()` calls, while thrown errors or `reject()` calls route to the rejection chain.【F:js/openaf.js†L13426-L13455】【F:js/openaf.js†L12472-L12528】
- **$doV** – same contract as `$do` but targets a virtual-thread-per-task executor so launching many concurrent tasks will not consume native threads when the JVM supports Project Loom virtual threads.【F:js/openaf.js†L13444-L13455】【F:js/openaf.js†L12421-L12438】
- **Coordination helpers** – mix `$doAll` / `$doFirst` (wrappers over `oPromise.all()` / `.race()`) to wait for all tasks or the first completion, enabling fan-out/fan-in patterns without manual synchronization primitives.【F:js/openaf.js†L13459-L13479】【F:js/openaf.js†L12532-L12589】
- **Cancellation** – call `.cancel()` on any `$do` / `$doV` promise to interrupt the associated thread (mirroring the Threads plugin) and drive the chain into the rejection path for cleanup.【F:js/openaf.js†L13426-L13455】【F:js/openaf.js†L12664-L12683】

Example fan-out flow using virtual threads:

Expand Down
204 changes: 113 additions & 91 deletions js/openaf.js
Original file line number Diff line number Diff line change
Expand Up @@ -12481,15 +12481,16 @@ const __getThreadPools = function() {
* Optionally if useVirtualThreads is true, the aFunction will be executed in a virtual thread, otherwise it will be executed in a normal thread.
* </odoc>
*/
const oPromise = function(aFunction, aRejFunction, useVirtualThreads) {
this.states = {
NEW: 0, FULFILLED: 1, PREFAILED: 2, FAILED: 3
};

this.state = $atomic(this.states.NEW, "int");
this.executing = $atomic(false, "boolean");
this.executors = new java.util.concurrent.ConcurrentLinkedQueue();
this.vThreads = useVirtualThreads || false
const oPromise = function(aFunction, aRejFunction, useVirtualThreads) {
this.states = {
NEW: 0, FULFILLED: 1, PREFAILED: 2, FAILED: 3
};

this.state = $atomic(this.states.NEW, "int");
this.executing = $atomic(false, "boolean");
this.executors = new java.util.concurrent.ConcurrentLinkedQueue();
this.vThreads = useVirtualThreads || false
this.__thread = __

this.then(aFunction, aRejFunction);
};
Expand Down Expand Up @@ -12660,92 +12661,110 @@ oPromise.prototype.resolve = function(aValue) {
return this;
};

oPromise.prototype.cancel = function() {
if (isDef(this.__f)) {
return this.__f.cancel(true);
}
};
oPromise.prototype.cancel = function(aReason) {
if (this.state.get() == this.states.FULFILLED || this.state.get() == this.states.FAILED) return false

this.reason = isDef(aReason) ? aReason : "cancelled"
this.state.set(this.states.PREFAILED)

var cancelled = false

try {
if (isDef(this.__f)) cancelled = this.__f.cancel(true)
} catch(e) {}

try {
if (isDef(this.__thread) && this.__thread.isAlive()) { this.__thread.interrupt(); cancelled = true }
} catch(e) {}

this.__exec()

return cancelled
}

oPromise.prototype.__exec = function() {
var thisOP = this;

do {
try {
this.__f = __getThreadPool(this.vThreads).submit(new java.lang.Runnable({
run: () => {
//var ignore = false;
//syncFn(() => { if (thisOP.executing.get()) ignore = true; else thisOP.executing.set(true); }, thisOP.executing.get());
if (!thisOP.executing.setIf(false, true)) return
//if (ignore) return;

try {
while (thisOP.executors.size() > 0) {
var f = thisOP.executors.poll();
// Exec
if (thisOP.state.get() != thisOP.states.PREFAILED &&
thisOP.state.get() != thisOP.states.FAILED &&
f != null && isDef(f) && f.type == "exec" && isDef(f.func) && isFunction(f.func)) {
var res, done = false;
try {
var checkResult = $atomic(true, "boolean");
if (isDef(thisOP.value)) {
res = f.func(thisOP.value);
} else {
res = f.func(function (v) { checkResult.set(false); thisOP.resolve(v); },
function (r) { checkResult.set(false); thisOP.reject(r); });
}

if (checkResult.get() &&
(isJavaObject(res) || isDef(res)) &&
res != null &&
(thisOP.state.get() == thisOP.states.NEW || thisOP.state.get() == thisOP.states.FULFILLED)) {
res = thisOP.resolve(res);
}
} catch (e) {
thisOP.reject(e);
}
}
// Reject
if (thisOP.state.get() == thisOP.states.PREFAILED || thisOP.state.get() == thisOP.states.FAILED) {
while (f != null && isDef(f) && f.type != "reject" && isDef(f.func) && isFunction(f.func)) {
f = thisOP.executors.poll();
}
do {
try {
this.__f = __getThreadPool(this.vThreads).submit(new java.lang.Runnable({
run: () => {
//var ignore = false;
//syncFn(() => { if (thisOP.executing.get()) ignore = true; else thisOP.executing.set(true); }, thisOP.executing.get());
if (!thisOP.executing.setIf(false, true)) return
//if (ignore) return;

if (f != null && isDef(f) && isDef(f.func) && isFunction(f.func)) {
try {
f.func(thisOP.reason);
thisOP.state.set(thisOP.states.FULFILLED);
} catch (e) {
thisOP.state.set(thisOP.states.FAILED);
throw e;
}
} else {
if (isUnDef(f) || f == null) thisOP.state.set(thisOP.states.FAILED);
}
}
}
} catch(ee) {
throw ee;
} finally {
//syncFn(() => { thisOP.executing.set(false); }, thisOP.executing.get());
thisOP.executing.set(false)
thisOP.__thread = java.lang.Thread.currentThread()

if (thisOP.executors.isEmpty()) {
thisOP.state.setIf(thisOP.states.NEW, thisOP.states.FULFILLED);
thisOP.state.setIf(thisOP.states.PREFAILED, thisOP.states.FAILED);
}
}
try {
while (thisOP.executors.size() > 0) {
var f = thisOP.executors.poll();
// Exec
if (thisOP.state.get() != thisOP.states.PREFAILED &&
thisOP.state.get() != thisOP.states.FAILED &&
f != null && isDef(f) && f.type == "exec" && isDef(f.func) && isFunction(f.func)) {
var res, done = false;
try {
var checkResult = $atomic(true, "boolean");
if (isDef(thisOP.value)) {
res = f.func(thisOP.value);
} else {
res = f.func(function (v) { checkResult.set(false); thisOP.resolve(v); },
function (r) { checkResult.set(false); thisOP.reject(r); });
}

if (checkResult.get() &&
(isJavaObject(res) || isDef(res)) &&
res != null &&
(thisOP.state.get() == thisOP.states.NEW || thisOP.state.get() == thisOP.states.FULFILLED)) {
res = thisOP.resolve(res);
}
} catch (e) {
thisOP.reject(e);
}
}
// Reject
if (thisOP.state.get() == thisOP.states.PREFAILED || thisOP.state.get() == thisOP.states.FAILED) {
while (f != null && isDef(f) && f.type != "reject" && isDef(f.func) && isFunction(f.func)) {
f = thisOP.executors.poll();
}

if (f != null && isDef(f) && isDef(f.func) && isFunction(f.func)) {
try {
f.func(thisOP.reason);
thisOP.state.set(thisOP.states.FULFILLED);
} catch (e) {
thisOP.state.set(thisOP.states.FAILED);
throw e;
}
} else {
if (isUnDef(f) || f == null) thisOP.state.set(thisOP.states.FAILED);
}
}
}
} catch(ee) {
throw ee;
} finally {
//syncFn(() => { thisOP.executing.set(false); }, thisOP.executing.get());
thisOP.executing.set(false)
thisOP.__thread = __

if (thisOP.executors.isEmpty()) {
thisOP.state.setIf(thisOP.states.NEW, thisOP.states.FULFILLED);
thisOP.state.setIf(thisOP.states.PREFAILED, thisOP.states.FAILED);
}
}

/*if (thisOP.state == thisOP.states.PREFAILED && thisOP.executors.isEmpty()) {
thisOP.state = thisOP.states.FAILED;
}*/
}
}));
} catch(e) {
if (String(e).indexOf("RejectedExecutionException") < 0) throw e;
}
// Try again if null
} while(isUnDef(this.__f) || this.__f == null);
/*if (thisOP.state == thisOP.states.PREFAILED && thisOP.executors.isEmpty()) {
thisOP.state = thisOP.states.FAILED;
}*/
}
}));
} catch(e) {
if (String(e).indexOf("RejectedExecutionException") < 0) throw e;
}
// Try again if null
} while(isUnDef(this.__f) || this.__f == null);

};

Expand Down Expand Up @@ -13410,10 +13429,11 @@ const $doA2B = function(aAFn, aBFn, noc, defaultTimeout, aErrorFunction, useVirt
* object will be immediatelly returned. Optionally this aFunction can receive a resolve and reject functions for to you use inside
* aFunction to provide a result with resolve(aResult) or an exception with reject(aReason). If you don't call theses functions the
* returned value will be used for resolve or any exception thrown will be use for reject. You can use the "then" method to add more
* aFunction that will execute once the previous as executed successfully (in a stack fashion). The return/resolve value from the
* aFunction that will execute once the previous as executed successfully (in a stack fashion). The return/resolve value from the
* previous function will be passed as the value for the second. You can use the "catch" method to add aFunction that will receive
* a string or exception for any exception thrown with the reject functions. You can also provide aRejFunction to work as a "catch"
* method as previously described before.
* method as previously described before. The returned oPromise also exposes a cancel method to interrupt the corresponding running
* thread (similar to the Threads plugin capabilities) and reject the promise chain.
* </odoc>
*/
const $do = function(aFunction, aRejFunction) {
Expand All @@ -13425,7 +13445,9 @@ const $do = function(aFunction, aRejFunction) {
* <key>$doV(aFunction, aRejFunction) : oPromise</key>
* Equivalent to $do but using virtual threads (if available) to execute the aFunction. This will allow the aFunction to be executed
* without blocking the current thread. If aFunction is not provided it will return a new oPromise that will be resolved
* when the first executor is added to it. If aRejFunction is provided it will be used as a "catch" method for the oPromise.
* when the first executor is added to it. If aRejFunction is provided it will be used as a "catch" method for the oPromise. The
* returned oPromise also exposes a cancel method to interrupt the corresponding running thread (similar to the Threads plugin
* capabilities) and reject the promise chain.
* </odoc>
*/
const $doV = function(aFunction, aRejFunction) {
Expand Down
56 changes: 55 additions & 1 deletion tests/autoTestAll.Generic.js
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,61 @@

ow.test.assert(res, false, "Problem with multiple $do().then().catch()");
};


exports.testDoCancel = function() {
var interrupted = $atomic(false, "boolean")
var cancelled = $atomic(false, "boolean")
var start = now()

var p = $do(() => {
while(true) {
sleep(1000, true)
if ((now() - start) > 5000) throw "timeout"
}
}).catch((reason) => {
if (String(reason) == "cancelled" || String(reason).toLowerCase().indexOf("interrupted") >= 0) {
cancelled.set(true)
interrupted.set(true)
}
$await("testDoCancel").notify()
})

$doWait(p, 1500)
var cRes = p.cancel("cancelled")
$await("testDoCancel").wait(2000)

ow.test.assert(cRes, true, "Problem cancelling $do");
ow.test.assert(interrupted.get(), true, "Problem interrupting $do thread")
ow.test.assert(cancelled.get(), true, "Problem propagating $do cancellation")
};

exports.testDoVCancel = function() {
var interrupted = $atomic(false, "boolean");
var cancelled = $atomic(false, "boolean");
var start = now();

var p = $doV(() => {
while(true) {
sleep(1000, true)
if ((now() - start) > 5000) throw "timeout"
}
}).catch((reason) => {
if (String(reason) == "cancelled" || String(reason).toLowerCase().indexOf("interrupted") >= 0) {
cancelled.set(true)
interrupted.set(true)
}
$await("testDoVCancel").notify()
})

$doWait(p, 1500)
var cRes = p.cancel("cancelled")
$await("testDoVCancel").wait(2000)

ow.test.assert(cRes, true, "Problem cancelling $doV");
ow.test.assert(interrupted.get(), true, "Problem interrupting $doV thread");
ow.test.assert(cancelled.get(), true, "Problem propagating $doV cancellation");
};

exports.testDoAll = function() {
var success = [];

Expand Down
28 changes: 19 additions & 9 deletions tests/autoTestAll.Generic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,25 @@ jobs:
to : oJob Test
exec: args.func = args.tests.testObjectCompression;

- name: OpenAF::Test $do
from: OpenAF::Init
then: oJob Test
exec: args.func = args.tests.testDo;

- name: OpenAF::Test $doAll
from: OpenAF::Init
then: oJob Test
exec: args.func = args.tests.testDoAll;
- name: OpenAF::Test $do
from: OpenAF::Init
then: oJob Test
exec: args.func = args.tests.testDo;

- name: OpenAF::Test $do cancel
from: OpenAF::Init
then: oJob Test
exec: args.func = args.tests.testDoCancel;

- name: OpenAF::Test $doV cancel
from: OpenAF::Init
then: oJob Test
exec: args.func = args.tests.testDoVCancel;

- name: OpenAF::Test $doAll
from: OpenAF::Init
then: oJob Test
exec: args.func = args.tests.testDoAll;

- name: OpenAF::Test $doFirst
from: OpenAF::Init
Expand Down