Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 34 additions & 7 deletions skiplang/prelude/src/skstore/Context.sk
Original file line number Diff line number Diff line change
Expand Up @@ -2303,11 +2303,18 @@ class ConcurrentFork() extends .Exception {
}
}

fun mergeForkNoGc(
class MergeState(
settings: ForkSettings,
start: Tick,
contexts: ContextsWithActions,
fork: ?String,
)

fun startMergeForkNoGc(
name: String,
settings: ForkSettings = ForkSettings::keepall(),
synchronizer: ?ForkSynchronizer = None(),
): void {
): MergeState {
SKStore.gGlobalLock();
contexts = SKStore.gContextsGetNoLock();
if (!contexts.has(Some(name))) {
Expand Down Expand Up @@ -2340,18 +2347,38 @@ fun mergeForkNoGc(
new = contexts.check(fork.fork, new_context.clone(), fork.postponables);
!new.contexts = new.contexts.remove(name);
SKStore.gUnsafeFree(contexts);
sync = UInt32::truncate(if (settings.isSync) 1 else 0);
SKStore.gContextsReplaceNoLock(new.contexts, sync);
SKStore.gGlobalUnlock();
new.contexts.get(fork.fork).notifyAll(start, settings.ignoredSessions);
new.actions.each(fn -> fn())
MergeState(settings, start, new, fork.fork)
| Failure(ex) ->
SKStore.gUnsafeFree(contexts);
SKStore.gGlobalUnlock();
throw ex
}
}

fun endMergeForkNoGc(state: MergeState): void {
new = state.contexts;
sync = UInt32::truncate(if (state.settings.isSync) 1 else 0);
SKStore.gContextsReplaceNoLock(new.contexts, sync);
SKStore.gGlobalUnlock();
new.contexts
.get(state.fork)
.notifyAll(state.start, state.settings.ignoredSessions);
new.actions.each(fn -> fn())
}

fun abortMergeForkNoGc(_state: MergeState): void {
// The unused parameter is here the be call in merge context
SKStore.gGlobalUnlock();
}

fun mergeForkNoGc(
name: String,
settings: ForkSettings = ForkSettings::keepall(),
synchronizer: ?ForkSynchronizer = None(),
): void {
endMergeForkNoGc(startMergeForkNoGc(name, settings, synchronizer))
}

fun removeForkNoGc(name: String, isSync: Bool = false): void {
gGlobalLock();
contexts = gContextsGetNoLock();
Expand Down
19 changes: 19 additions & 0 deletions skiplang/prelude/ts/wasm/src/sk_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,25 @@ export class Utils {
}
};

async unsafeAsyncWithGc<T>(fn: () => Promise<T>): Promise<T> {
this.stddebug = [];
const obsPos = this.exports.SKIP_new_Obstack();
try {
const value = await fn();
// clone must be done before SKIP_destroy_Obstack
const res = cloneIfProxy(value);
this.exports.SKIP_destroy_Obstack(obsPos);
return res;
} catch (ex) {
this.exports.SKIP_destroy_Obstack(obsPos);
throw ex;
} finally {
if (this.stddebug.length > 0) {
console.log(this.stddebug.join(""));
}
}
}

getState<T>(name: string, create: () => T): T {
let state = this.states.get(name);
if (state == undefined) {
Expand Down
103 changes: 103 additions & 0 deletions skipruntime-ts/addon/src/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,109 @@ void RunWithGC(const v8::FunctionCallbackInfo<v8::Value>& args) {
SKIP_destroy_Obstack(obstack);
}

void UnsafeAsyncRunWithGC(const v8::FunctionCallbackInfo<v8::Value>& args) {
Isolate* isolate = args.GetIsolate();
HandleScope scope(isolate);
if (args.Length() != 1) {
isolate->ThrowException(
Exception::TypeError(FromUtf8(isolate, "Must have one parameters.")));
return;
};
if (!args[0]->IsFunction()) {
isolate->ThrowException(Exception::TypeError(
FromUtf8(isolate, "The parameter must be a function.")));
return;
}
SKObstack obstack = SKIP_new_Obstack();
Local<Context> context = isolate->GetCurrentContext();
TryCatch tryCatch(isolate);
MaybeLocal<Value> optResult =
args[0].As<Function>()->Call(context, Null(isolate), 0, nullptr);

if (tryCatch.HasCaught()) {
SKIP_destroy_Obstack(obstack);
tryCatch.ReThrow();
return;
}

Local<Value> result = optResult.ToLocalChecked();

if (result->IsPromise()) {
Local<v8::Promise> promise = result.As<v8::Promise>();

Local<External> obstackData = External::New(isolate, obstack);

Local<v8::Promise::Resolver> resolver =
v8::Promise::Resolver::New(context).ToLocalChecked();
Local<v8::Promise> wrapperPromise = resolver->GetPromise();

// Create handler data array with obstack and resolver
Local<Array> handlerData = Array::New(isolate, 2);
handlerData->Set(context, 0, obstackData).ToChecked();
handlerData->Set(context, 1, resolver).ToChecked();

// Create success handler that cleans up and resolves
Local<Function> onFulfilled =
Function::New(
context,
[](const v8::FunctionCallbackInfo<v8::Value>& info) {
Isolate* isolate = info.GetIsolate();
HandleScope scope(isolate);
Local<Context> context = isolate->GetCurrentContext();

Local<Array> data = info.Data().As<Array>();
SKObstack obstack = data->Get(context, 0)
.ToLocalChecked()
.As<External>()
->Value();
Local<v8::Promise::Resolver> resolver =
data->Get(context, 1)
.ToLocalChecked()
.As<v8::Promise::Resolver>();

SKIP_destroy_Obstack(obstack);

Local<Value> value = info[0];
resolver->Resolve(context, value).ToChecked();
},
handlerData)
.ToLocalChecked();

Local<Function> onRejected =
Function::New(
context,
[](const v8::FunctionCallbackInfo<v8::Value>& info) {
Isolate* isolate = info.GetIsolate();
HandleScope scope(isolate);
Local<Context> context = isolate->GetCurrentContext();

Local<Array> data = info.Data().As<Array>();
SKObstack obstack = data->Get(context, 0)
.ToLocalChecked()
.As<External>()
->Value();
Local<v8::Promise::Resolver> resolver =
data->Get(context, 1)
.ToLocalChecked()
.As<v8::Promise::Resolver>();

SKIP_destroy_Obstack(obstack);

Local<Value> error = info[0];
resolver->Reject(context, error).ToChecked();
},
handlerData)
.ToLocalChecked();

promise->Then(context, onFulfilled, onRejected).ToLocalChecked();

args.GetReturnValue().Set(wrapperPromise);
} else {
SKIP_destroy_Obstack(obstack);
args.GetReturnValue().Set(result);
}
}

void GetErrorObject(const v8::FunctionCallbackInfo<v8::Value>& args) {
Isolate* isolate = args.GetIsolate();
HandleScope scope(isolate);
Expand Down
2 changes: 2 additions & 0 deletions skipruntime-ts/addon/src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ char* CallJSNullableStringFunction(v8::Isolate*, v8::Local<v8::Object>,
const char*, int, v8::Local<v8::Value>[]);
void NatTryCatch(v8::Isolate*, std::function<void(v8::Isolate*)>);
void RunWithGC(const v8::FunctionCallbackInfo<v8::Value>&);
void UnsafeAsyncRunWithGC(const v8::FunctionCallbackInfo<v8::Value>&);
void GetErrorObject(const v8::FunctionCallbackInfo<v8::Value>&);
char* ToSKString(v8::Isolate*, v8::Local<v8::Value>);
void Print(v8::Isolate*, const char*, v8::Local<v8::Value>);
Expand Down Expand Up @@ -70,5 +71,6 @@ struct SkipException : std::exception {
#define SKNotifier void*
#define SKReducer void*
#define SKMapper void*
#define SKMergeState void*

#endif // SKCOMMON_H
2 changes: 2 additions & 0 deletions skipruntime-ts/addon/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type AddOn = {
getSkipRuntimeFromBinding: () => SkipRuntimeFromBinding;
initSkipRuntimeToBinding: (binding: ToBinding) => void;
runWithGC: <T>(fn: () => T) => T;
unsafeAsyncRunWithGC: <T>(fn: () => Promise<T>) => Promise<T>;
getErrorObject: (skExc: Pointer<IException>) => Error;
};

Expand All @@ -27,6 +28,7 @@ const fromBinding = skip_runtime.getSkipRuntimeFromBinding();
const tobinding = new ToBinding(
fromBinding,
skip_runtime.runWithGC,
skip_runtime.unsafeAsyncRunWithGC,
() => jsonConverter,
skip_runtime.getErrorObject,
);
Expand Down
2 changes: 2 additions & 0 deletions skipruntime-ts/addon/src/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ void InitToBinding(const v8::FunctionCallbackInfo<v8::Value>& args) {
void Initialize(v8::Local<v8::Object> exports, v8::Local<v8::Value> module,
void* context) {
NODE_SET_METHOD(exports, "runWithGC", skbinding::RunWithGC);
NODE_SET_METHOD(exports, "unsafeAsyncRunWithGC",
skbinding::UnsafeAsyncRunWithGC);
NODE_SET_METHOD(exports, "getErrorObject", skbinding::GetErrorObject);
NODE_SET_METHOD(exports, "getJsonBinding", skjson::GetBinding);
NODE_SET_METHOD(exports, "getSkipRuntimeFromBinding",
Expand Down
76 changes: 76 additions & 0 deletions skipruntime-ts/addon/src/tojs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ CJSON SkipRuntime_Runtime__getForKey(char* resource, CJObject jsonParams,
CJSON SkipRuntime_Runtime__update(char* input, CJSON values);
double SkipRuntime_Runtime__fork(char* input);
double SkipRuntime_Runtime__merge(CJArray);
SKMergeState SkipRuntime_Runtime__startMerge(CJArray);
double SkipRuntime_Runtime__endMerge(SKMergeState);
double SkipRuntime_Runtime__abortMerge(SKMergeState);

double SkipRuntime_Runtime__abortFork();
uint32_t SkipRuntime_Runtime__forkExists(char* input);
CJSON SkipRuntime_Runtime__reload(SKService service);
Expand Down Expand Up @@ -902,6 +906,72 @@ void MergeOfRuntime(const FunctionCallbackInfo<Value>& args) {
});
}

void StartMergeOfRuntime(const FunctionCallbackInfo<Value>& args) {
Isolate* isolate = args.GetIsolate();
HandleScope scope(isolate);
if (args.Length() != 1) {
// Throw an Error that is passed back to JavaScript
isolate->ThrowException(
Exception::TypeError(FromUtf8(isolate, "Must have one parameter.")));
return;
};
if (!args[0]->IsExternal()) {
// Throw an Error that is passed back to JavaScript
isolate->ThrowException(Exception::TypeError(
FromUtf8(isolate, "The parameter must be a pointer.")));
return;
};
NatTryCatch(isolate, [&args](Isolate* isolate) {
SKMergeState skstate =
SkipRuntime_Runtime__startMerge(args[0].As<External>()->Value());
args.GetReturnValue().Set(External::New(isolate, skstate));
});
}

void EndMergeOfRuntime(const FunctionCallbackInfo<Value>& args) {
Isolate* isolate = args.GetIsolate();
HandleScope scope(isolate);
if (args.Length() != 1) {
// Throw an Error that is passed back to JavaScript
isolate->ThrowException(
Exception::TypeError(FromUtf8(isolate, "Must have one parameter.")));
return;
};
if (!args[0]->IsExternal()) {
// Throw an Error that is passed back to JavaScript
isolate->ThrowException(Exception::TypeError(
FromUtf8(isolate, "The parameter must be a pointer.")));
return;
};
NatTryCatch(isolate, [&args](Isolate* isolate) {
double skerror =
SkipRuntime_Runtime__endMerge(args[0].As<External>()->Value());
args.GetReturnValue().Set(Number::New(isolate, skerror));
});
}

void AbortMergeOfRuntime(const FunctionCallbackInfo<Value>& args) {
Isolate* isolate = args.GetIsolate();
HandleScope scope(isolate);
if (args.Length() != 1) {
// Throw an Error that is passed back to JavaScript
isolate->ThrowException(
Exception::TypeError(FromUtf8(isolate, "Must have one parameter.")));
return;
};
if (!args[0]->IsExternal()) {
// Throw an Error that is passed back to JavaScript
isolate->ThrowException(Exception::TypeError(
FromUtf8(isolate, "The parameter must be a pointer.")));
return;
};
NatTryCatch(isolate, [&args](Isolate* isolate) {
double skerror =
SkipRuntime_Runtime__abortMerge(args[0].As<External>()->Value());
args.GetReturnValue().Set(Number::New(isolate, skerror));
});
}

void AbortForkOfRuntime(const FunctionCallbackInfo<Value>& args) {
Isolate* isolate = args.GetIsolate();
HandleScope scope(isolate);
Expand Down Expand Up @@ -1043,6 +1113,12 @@ void GetToJSBinding(const FunctionCallbackInfo<Value>& args) {
AddFunction(isolate, binding, "SkipRuntime_Runtime__update", UpdateOfRuntime);
AddFunction(isolate, binding, "SkipRuntime_Runtime__fork", ForkOfRuntime);
AddFunction(isolate, binding, "SkipRuntime_Runtime__merge", MergeOfRuntime);
AddFunction(isolate, binding, "SkipRuntime_Runtime__startMerge",
StartMergeOfRuntime);
AddFunction(isolate, binding, "SkipRuntime_Runtime__endMerge",
EndMergeOfRuntime);
AddFunction(isolate, binding, "SkipRuntime_Runtime__abortMerge",
AbortMergeOfRuntime);
AddFunction(isolate, binding, "SkipRuntime_Runtime__abortFork",
AbortForkOfRuntime);
AddFunction(isolate, binding, "SkipRuntime_Runtime__forkExists",
Expand Down
9 changes: 7 additions & 2 deletions skipruntime-ts/core/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,11 @@ export interface Resource<
): EagerCollection<Json, Json>;
}

export interface Store<K extends Json, V extends Json> {
load(): Promise<Entry<K, V>[]>;
save(data: Entry<K, V>[]): Promise<void>;
}

/**
* Initial data for a service's input collections.
*
Expand All @@ -492,8 +497,8 @@ export interface Resource<
*/
export type InitialData<Inputs extends NamedCollections> = {
[Name in keyof Inputs]: Inputs[Name] extends EagerCollection<infer K, infer V>
? Entry<K, V>[]
: Entry<Json, Json>[];
? Entry<K, V>[] | Store<K, V>
: Entry<Json, Json>[] | Store<Json, Json>;
};

/**
Expand Down
9 changes: 9 additions & 0 deletions skipruntime-ts/core/src/binding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,15 @@ export interface FromBinding {
SkipRuntime_Runtime__merge(
ignore: Pointer<Internal.CJArray<Internal.CJString>>,
): Handle<Error>;
SkipRuntime_Runtime__startMerge(
ignore: Pointer<Internal.CJArray<Internal.CJString>>,
): Pointer<Internal.MergeState>;
SkipRuntime_Runtime__endMerge(
state: Pointer<Internal.MergeState>,
): Handle<Error>;
SkipRuntime_Runtime__abortMerge(
state: Pointer<Internal.MergeState>,
): Handle<Error>;
SkipRuntime_Runtime__abortFork(): Handle<Error>;
SkipRuntime_Runtime__forkExists(name: string): boolean;

Expand Down
Loading